Skip to content

Commit

Permalink
pkg/k8s/watcher: fix deadlock with service event handler & CES watcher.
Browse files Browse the repository at this point in the history
There is a deadlock that can occur when a k8s service update and a policy update occur at the same time.

In practice, this can occur in the following situation:

1. CiliumEndpointSlice k8s watcher performs an update due to a new watcher event.
   The handler logic for this first goes to hold a lock on the IPCache.
   Next, this triggers an endpoint regeneration via the endpoint manager.

   Note: This code path will wait for endpoint regeneration to complete via a
         passed WaitGroup.

   To complete this task, endpoint manager attempts to lock policyRepository.
   Effectively, this means that CES handler has locking dependencies on IPCache's lock
   and policyRepos lock (transitively, by waiting on endpointManager endpoint regeneration).
   It will not release the IPCache lock until endpoint regen is done, thus waiting on the
   policyRepo lock.

2. The k8sServiceHandler control loop performs an update due to kube-apiserver
   service record change (i.e. this is common on EKS where the control plane IPs
   change often).
   A new policyRepository.Translator is constructed with k8s.RuleTranslator{} with
   AllocatedPrefixes being enabled.
   This implementation of the Translator holds a reference to ipcache and uses that
   to make necessary prefix updates to ipcache during the translation.
   This is passed to policyRepository to perform policy rule translation, which locks
   itself before proceeding to use translator.Translate(...) to perform translation
   on its state.

   The k8sServiceHandler now holds nested locks on policyRepo -> ipcache.

At this point, let's say codepath 1. can is holding a lock on both ipcache and waiting on a lock for
policyRepo (nested ipCache -> policyRepo).

At the same time, codepath 2. (i.e. k8sServiceHandler) just grabbed a policyRepo lock and is waiting
for the ipcache lock.

Codepath 2 (which holds policyRepo) needs ipcache to unlock, which is held by Codepath 1,
Which is waiting for policyRepo to unlock.

The following is a stack trace of such a case occurring:

101 occurences. Sample stack trace:                    6 occurences. Sample stack trace:
sync.runtime_SemacquireMutex(0xc0018f0e08?, 0x20?, 0xc000c12740?)
	/usr/local/go/src/runtime/sema.go:71 +0x25
sync.(*RWMutex).RLock(...)
	/usr/local/go/src/sync/rwmutex.go:63
github.com/cilium/cilium/pkg/endpoint.(*Endpoint).regeneratePolicy(0xc0010c7c00)
	/go/src/github.com/cilium/cilium/pkg/endpoint/policy.go:198 +0x11a
github.com/cilium/cilium/pkg/endpoint.(*Endpoint).runPreCompilationSteps(0xc0010c7c00, 0xc0005be400)
	/go/src/github.com/cilium/cilium/pkg/endpoint/bpf.go:814 +0x2c5
github.com/cilium/cilium/pkg/endpoint.(*Endpoint).regenerateBPF(0xc0010c7c00, 0xc0005be400)
	/go/src/github.com/cilium/cilium/pkg/endpoint/bpf.go:584 +0x189
github.com/cilium/cilium/pkg/endpoint.(*Endpoint).regenerate(0xc0010c7c00, 0xc0005be400)
	/go/src/github.com/cilium/cilium/pkg/endpoint/policy.go:398 +0x7a5
github.com/cilium/cilium/pkg/endpoint.(*EndpointRegenerationEvent).Handle(0xc0099405b0, 0x2a27540?)
	/go/src/github.com/cilium/cilium/pkg/endpoint/events.go:53 +0x325
github.com/cilium/cilium/pkg/eventqueue.(*EventQueue).run.func1()
	/go/src/github.com/cilium/cilium/pkg/eventqueue/eventqueue.go:245 +0x13b
sync.(*Once).doSlow(0x2f14d01?, 0x4422a5?)
	/usr/local/go/src/sync/once.go:68 +0xc2
sync.(*Once).Do(...)
	/usr/local/go/src/sync/once.go:59
github.com/cilium/cilium/pkg/eventqueue.(*EventQueue).run(0x0?)
	/go/src/github.com/cilium/cilium/pkg/eventqueue/eventqueue.go:233 +0x45
created by github.com/cilium/cilium/pkg/eventqueue.(*EventQueue).Run
	/go/src/github.com/cilium/cilium/pkg/eventqueue/eventqueue.go:229 +0x76

1 occurences. Sample stack trace:
sync.runtime_Semacquire(0xc0003f44d0?)
	/usr/local/go/src/runtime/sema.go:56 +0x25
sync.(*WaitGroup).Wait(0xc0003f5420?)
	/usr/local/go/src/sync/waitgroup.go:136 +0x52
github.com/cilium/cilium/pkg/ipcache.(*IPCache).UpdatePolicyMaps(0xc001003580, {0x3468338, 0xc00007e038}, 0xa?, 0xc008c15e60)
	/go/src/github.com/cilium/cilium/pkg/ipcache/metadata.go:235 +0xc7
github.com/cilium/cilium/pkg/ipcache.(*IPCache).removeLabelsFromIPs(0xc001003580, 0xc005d73778?, {0x2f35b2b, 0xf})
	/go/src/github.com/cilium/cilium/pkg/ipcache/metadata.go:414 +0x7c5
github.com/cilium/cilium/pkg/ipcache.(*IPCache).RemoveLabelsExcluded(0xc001003580, 0xc0000e3110, 0xc001506dd8?, {0x2f35b2b, 0xf})
	/go/src/github.com/cilium/cilium/pkg/ipcache/metadata.go:328 +0x1ab
github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).handleKubeAPIServerServiceEPChanges(0xc001586d80, 0xc003ec89b0?)
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/endpoint.go:135 +0x5b
github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).addKubeAPIServerServiceEPSliceV1(0xf3c386?, 0xc001ab7d40)
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/endpoint_slice.go:205 +0x452
github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).updateK8sEndpointSliceV1(0xc001586d80, 0xc001ab7d40?, 0xc001ab7d40?)
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/endpoint_slice.go:178 +0x69
github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).endpointSlicesInit.func2({0x2ec7ea0?, 0xc00294c410?}, {0x2ec7ea0, 0xc001ab7d40})
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/endpoint_slice.go:71 +0x125
k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate(...)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/client-go/tools/cache/controller.go:239
github.com/cilium/cilium/pkg/k8s/informer.NewInformerWithStore.func1({0x2a4b9c0?, 0xc00057d1e8?})
	/go/src/github.com/cilium/cilium/pkg/k8s/informer/informer.go:103 +0x2fe
k8s.io/client-go/tools/cache.(*DeltaFIFO).Pop(0xc001b805a0, 0xc000927940)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/client-go/tools/cache/delta_fifo.go:554 +0x566
k8s.io/client-go/tools/cache.(*controller).processLoop(0xc001bda1b0)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/client-go/tools/cache/controller.go:184 +0x36
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1(0x40d6a5?)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:155 +0x3e
k8s.io/apimachinery/pkg/util/wait.BackoffUntil(0xed53e5?, {0x343e1c0, 0xc000d50450}, 0x1, 0xc000929980)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:156 +0xb6
k8s.io/apimachinery/pkg/util/wait.JitterUntil(0xc001bda218?, 0x3b9aca00, 0x0, 0x30?, 0x7f587b87fd30?)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:133 +0x89
k8s.io/apimachinery/pkg/util/wait.Until(...)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:90
k8s.io/client-go/tools/cache.(*controller).Run(0xc001bda1b0, 0xc000929980)
	/go/src/github.com/cilium/cilium/vendor/k8s.io/client-go/tools/cache/controller.go:155 +0x2c5
created by github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).endpointSlicesInit
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/endpoint_slice.go:156 +0x759

1 occurences. Sample stack trace:
sync.runtime_SemacquireMutex(0xc000880000?, 0x20?, 0x21?)
	/usr/local/go/src/runtime/sema.go:71 +0x25
sync.(*RWMutex).RLock(...)
	/usr/local/go/src/sync/rwmutex.go:63
github.com/cilium/cilium/pkg/ipcache.(*metadata).get(0xc00104f770?, {0xc0069e9160?, 0x9?})
	/go/src/github.com/cilium/cilium/pkg/ipcache/metadata.go:90 +0x66
github.com/cilium/cilium/pkg/ipcache.(*IPCache).GetIDMetadataByIP(...)
	/go/src/github.com/cilium/cilium/pkg/ipcache/metadata.go:86
github.com/cilium/cilium/pkg/ipcache.(*IPCache).AllocateCIDRs(0xc001003580, {0xc008680cf0, 0x2, 0x0?}, {0x0, 0x0, 0x0?}, 0x0)
	/go/src/github.com/cilium/cilium/pkg/ipcache/cidr.go:57 +0x22b
github.com/cilium/cilium/pkg/k8s.RuleTranslator.generateToCidrFromEndpoint({0xc001003580, {{0xc005bb63c0, 0xa}, {0xc005bb6378, 0x7}}, {0xc008c15e00}, 0xc001905e60, 0x0, 0x1}, 0xc001f667e0, ...)
	/go/src/github.com/cilium/cilium/pkg/k8s/rule_translate.go:124 +0xb3
github.com/cilium/cilium/pkg/k8s.RuleTranslator.populateEgress({0xc001003580, {{0xc005bb63c0, 0xa}, {0xc005bb6378, 0x7}}, {0xc008c15e00}, 0xc001905e60, 0x0, 0x1}, 0xc001f667e0, ...)
	/go/src/github.com/cilium/cilium/pkg/k8s/rule_translate.go:62 +0x172
github.com/cilium/cilium/pkg/k8s.RuleTranslator.TranslateEgress({0xc001003580, {{0xc005bb63c0, 0xa}, {0xc005bb6378, 0x7}}, {0xc008c15e00}, 0xc001905e60, 0x0, 0x1}, 0xc001f667e0, ...)
	/go/src/github.com/cilium/cilium/pkg/k8s/rule_translate.go:51 +0x18e
github.com/cilium/cilium/pkg/k8s.RuleTranslator.Translate({0xc001003580, {{0xc005bb63c0, 0xa}, {0xc005bb6378, 0x7}}, {0xc008c15e00}, 0xc001905e60, 0x0, 0x1}, 0xc001c66750, ...)
	/go/src/github.com/cilium/cilium/pkg/k8s/rule_translate.go:33 +0x117
github.com/cilium/cilium/pkg/policy.(*Repository).TranslateRules(0xc0003f5490, {0x3440260, 0xc0025fd280})
	/go/src/github.com/cilium/cilium/pkg/policy/repository.go:627 +0x10b
github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).k8sServiceHandler.func1({0x0, {{0xc005bb63c0, 0xa}, {0xc005bb6378, 0x7}}, 0xc0015f0c80, 0x0, 0xc003165f50, 0xc001bc9c80})
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/watcher.go:586 +0xc9e
github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).k8sServiceHandler(0xc001586d80)
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/watcher.go:623 +0x9f
created by github.com/cilium/cilium/pkg/k8s/watchers.(*K8sWatcher).RunK8sServiceHandler
	/go/src/github.com/cilium/cilium/pkg/k8s/watchers/watcher.go:629 +0x56

This commit solves this situation by moving the IPCache allocation out of the k8s.RuleTranslator
Translator implementation. Thus moving the responsibility of the IPCache updating out of the translator.
This removes the nested policyRepo -> ipcache locks in translator.
So, in situations like the one described, the translation no longer has a dependency on ipcache.
Codepath 2 will be able to complete, releasing the policyRepo lock and allowing Codepath 1 to proceed.

Note: Rule translation prefixes are not used in other usages of k8s.RuleTranslator called from endpoint
watcher handler. So we don't have to add the same ipcache logic as in k8sServiceHandler.

Signed-off-by: Tom Hadlaw <tom.hadlaw@isovalent.com>
Reported-by: Michi Mutsuzaki <michi@isovalent.com>
  • Loading branch information
tommyp1ckles authored and joestringer committed Aug 30, 2022
1 parent 5928807 commit 4b87ccc
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 36 deletions.
60 changes: 33 additions & 27 deletions pkg/k8s/rule_translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ var _ policy.Translator = RuleTranslator{}
// Translate populates/depopulates given rule with ToCIDR rules
// Based on provided service/endpoint
type RuleTranslator struct {
ipcache *ipcache.IPCache
Service ServiceID
Endpoint Endpoints
ServiceLabels map[string]string
Expand Down Expand Up @@ -59,8 +58,10 @@ func (k RuleTranslator) TranslateEgress(r *api.EgressRule, result *policy.Transl
func (k RuleTranslator) populateEgress(r *api.EgressRule, result *policy.TranslationResult) error {
for _, service := range r.ToServices {
if k.serviceMatches(service) {
if err := k.generateToCidrFromEndpoint(r, k.Endpoint, k.AllocatePrefixes); err != nil {
if backendPrefixes, err := k.generateToCidrFromEndpoint(r, k.Endpoint, k.AllocatePrefixes); err != nil {
return err
} else {
result.PrefixesToAdd = append(result.PrefixesToAdd, backendPrefixes...)
}
// TODO: generateToPortsFromEndpoint when ToPorts and ToCIDR are compatible
}
Expand All @@ -74,8 +75,10 @@ func (k RuleTranslator) depopulateEgress(r *api.EgressRule, result *policy.Trans
// counting rules twice
result.NumToServicesRules++
if k.serviceMatches(service) {
if err := k.deleteToCidrFromEndpoint(r, k.Endpoint, k.AllocatePrefixes); err != nil {
if prefixesToRelease, err := k.deleteToCidrFromEndpoint(r, k.Endpoint, k.AllocatePrefixes); err != nil {
return err
} else {
result.PrefixesToRelease = append(result.PrefixesToRelease, prefixesToRelease...)
}
// TODO: generateToPortsFromEndpoint when ToPorts and ToCIDR are compatible
}
Expand Down Expand Up @@ -105,24 +108,18 @@ func (k RuleTranslator) serviceMatches(service api.Service) bool {
func (k RuleTranslator) generateToCidrFromEndpoint(
egress *api.EgressRule,
endpoint Endpoints,
allocatePrefixes bool) error {
allocatePrefixes bool) ([]*net.IPNet, error) {

var prefixes []*net.IPNet
var err error
// allocatePrefixes if true here implies that this translation is
// occurring after policy import. This means that the CIDRs were not
// known at that time, so the IPCache hasn't been informed about them.
// In this case, it's the job of this Translator to notify the IPCache.
if allocatePrefixes {
prefixes, err := endpoint.CIDRPrefixes()
prefixes, err = endpoint.CIDRPrefixes()
if err != nil {
return err
}
// TODO: Collect new identities to be upserted to the ipcache only after all
// endpoints have been regenerated later. This would make sure that any CIDRs in the
// policy would be first pushed to the endpoint policies and then to the ipcache to
// avoid traffic mapping to an ID that the endpoint policy maps do not know about
// yet.
if _, err := k.ipcache.AllocateCIDRs(prefixes, nil, nil); err != nil {
return err
return nil, err
}
}

Expand All @@ -131,14 +128,14 @@ func (k RuleTranslator) generateToCidrFromEndpoint(
for ip := range endpoint.Backends {
epIP := net.ParseIP(ip)
if epIP == nil {
return fmt.Errorf("unable to parse ip: %s", ip)
return nil, fmt.Errorf("unable to parse ip: %s", ip)
}

found := false
for _, c := range egress.ToCIDRSet {
_, cidr, err := net.ParseCIDR(string(c.Cidr))
if err != nil {
return err
return nil, err
}
if cidr.Contains(epIP) {
found = true
Expand All @@ -153,7 +150,7 @@ func (k RuleTranslator) generateToCidrFromEndpoint(
})
}
}
return nil
return prefixes, nil
}

// deleteToCidrFromEndpoint takes an egress rule and removes ToCIDR rules
Expand All @@ -167,14 +164,15 @@ func (k RuleTranslator) generateToCidrFromEndpoint(
func (k RuleTranslator) deleteToCidrFromEndpoint(
egress *api.EgressRule,
endpoint Endpoints,
releasePrefixes bool) error {
releasePrefixes bool) ([]*net.IPNet, error) {

var toReleasePrefixes []*net.IPNet
delCIDRRules := make(map[int]*api.CIDRRule, len(egress.ToCIDRSet))

for ip := range endpoint.Backends {
epIP := net.ParseIP(ip)
if epIP == nil {
return fmt.Errorf("unable to parse ip: %s", ip)
return nil, fmt.Errorf("unable to parse ip: %s", ip)
}

for i, c := range egress.ToCIDRSet {
Expand All @@ -184,7 +182,7 @@ func (k RuleTranslator) deleteToCidrFromEndpoint(
}
_, cidr, err := net.ParseCIDR(string(c.Cidr))
if err != nil {
return err
return nil, err
}
// delete all generated CIDRs for a CIDR that match the given
// endpoint
Expand All @@ -200,16 +198,15 @@ func (k RuleTranslator) deleteToCidrFromEndpoint(
// If no rules were deleted we can do an early return here and avoid doing
// the useless operations below.
if len(delCIDRRules) == 0 {
return nil
return toReleasePrefixes, nil
}

if releasePrefixes {
delSlice := make([]api.CIDRRule, 0, len(egress.ToCIDRSet))
for _, delCIDRRule := range delCIDRRules {
delSlice = append(delSlice, *delCIDRRule)
}
prefixes := policy.GetPrefixesFromCIDRSet(delSlice)
k.ipcache.ReleaseCIDRIdentitiesByCIDR(prefixes)
toReleasePrefixes = policy.GetPrefixesFromCIDRSet(delSlice)
}

// if endpoint is not in CIDR or it's not generated it's ok to retain it
Expand All @@ -224,7 +221,7 @@ func (k RuleTranslator) deleteToCidrFromEndpoint(

egress.ToCIDRSet = newCIDRRules

return nil
return toReleasePrefixes, nil
}

// PreprocessRules translates rules that apply to headless services
Expand All @@ -244,6 +241,8 @@ func PreprocessRules(r api.Rules, cache *ServiceCache, ipcache *ipcache.IPCache)
eps := ep.GetEndpoints()
if eps != nil {
t := NewK8sTranslator(ipcache, ns, *eps, false, svc.Labels, false)
// We don't need to check the translation result here because the k8s
// RuleTranslator above sets allocatePrefixes to be false.
err := t.Translate(rule, &policy.TranslationResult{})
if err != nil {
return err
Expand All @@ -255,14 +254,21 @@ func PreprocessRules(r api.Rules, cache *ServiceCache, ipcache *ipcache.IPCache)
return nil
}

// NewK8sTranslator returns RuleTranslator
// NewK8sTranslator returns RuleTranslator.
// If allocatePrefixes is set to true, then translation calls will return
// prefixes that need to be allocated or deallocated.
func NewK8sTranslator(
ipcache *ipcache.IPCache,
serviceInfo ServiceID,
endpoint Endpoints,
revert bool,
labels map[string]string,
allocatePrefixes bool) RuleTranslator {

return RuleTranslator{ipcache, serviceInfo, endpoint, labels, revert, allocatePrefixes}
return RuleTranslator{
Service: serviceInfo,
Endpoint: endpoint,
ServiceLabels: labels,
Revert: revert,
AllocatePrefixes: allocatePrefixes,
}
}
26 changes: 17 additions & 9 deletions pkg/k8s/rule_translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package k8s

import (
"net"
"sort"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -226,9 +227,9 @@ func (s *K8sSuite) TestGenerateToCIDRFromEndpoint(c *C) {
}

translator := NewK8sTranslator(ipcache.NewIPCache(nil), serviceInfo, endpointInfo, false, map[string]string{}, false)
err := translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
prefixesToAllocate, err := translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
c.Assert(err, IsNil)

c.Assert(len(prefixesToAllocate), Equals, 0, Commentf("if allocatePrefixes is false, then it should return nothing"))
cidrs := rule.ToCIDRSet.StringSlice()
sort.Strings(cidrs)
c.Assert(len(cidrs), Equals, 2)
Expand All @@ -238,8 +239,15 @@ func (s *K8sSuite) TestGenerateToCIDRFromEndpoint(c *C) {
})

// second run, to make sure there are no duplicates added
err = translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
prefixesToAllocate, err = translator.generateToCidrFromEndpoint(rule, endpointInfo, true)
c.Assert(err, IsNil)
c.Assert(len(prefixesToAllocate), Equals, 2, Commentf("if allocatePrefixes is true, then it should list of prefixes to allocate"))
_, epIP1Prefix, err := net.ParseCIDR(epIP1 + "/32")
c.Assert(err, IsNil)
_, epIP2Prefix, err := net.ParseCIDR(epIP2 + "/32")
c.Assert(err, IsNil)
c.Assert(prefixesToAllocate[0].String(), Equals, epIP1Prefix.String())
c.Assert(prefixesToAllocate[1].String(), Equals, epIP2Prefix.String())

cidrs = rule.ToCIDRSet.StringSlice()
sort.Strings(cidrs)
Expand All @@ -249,12 +257,12 @@ func (s *K8sSuite) TestGenerateToCIDRFromEndpoint(c *C) {
epIP2 + "/32",
})

err = translator.deleteToCidrFromEndpoint(rule, endpointInfo, false)
_, err = translator.deleteToCidrFromEndpoint(rule, endpointInfo, false)
c.Assert(err, IsNil)
c.Assert(len(rule.ToCIDRSet), Equals, 0)

// third run, to make sure there are no duplicates added
err = translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
_, err = translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
c.Assert(err, IsNil)

cidrs = rule.ToCIDRSet.StringSlice()
Expand All @@ -266,7 +274,7 @@ func (s *K8sSuite) TestGenerateToCIDRFromEndpoint(c *C) {
})

// and one final delete
err = translator.deleteToCidrFromEndpoint(rule, endpointInfo, false)
_, err = translator.deleteToCidrFromEndpoint(rule, endpointInfo, false)
c.Assert(err, IsNil)
c.Assert(len(rule.ToCIDRSet), Equals, 0)
}
Expand Down Expand Up @@ -367,20 +375,20 @@ func (s *K8sSuite) TestDontDeleteUserRules(c *C) {
}

translator := NewK8sTranslator(ipcache.NewIPCache(nil), serviceInfo, endpointInfo, false, map[string]string{}, false)
err := translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
_, err := translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
c.Assert(err, IsNil)

c.Assert(len(rule.ToCIDRSet), Equals, 2)
c.Assert(string(rule.ToCIDRSet[1].Cidr), Equals, epIP+"/32")

// second run, to make sure there are no duplicates added
err = translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
_, err = translator.generateToCidrFromEndpoint(rule, endpointInfo, false)
c.Assert(err, IsNil)

c.Assert(len(rule.ToCIDRSet), Equals, 2)
c.Assert(string(rule.ToCIDRSet[1].Cidr), Equals, epIP+"/32")

err = translator.deleteToCidrFromEndpoint(rule, endpointInfo, false)
_, err = translator.deleteToCidrFromEndpoint(rule, endpointInfo, false)
c.Assert(err, IsNil)
c.Assert(len(rule.ToCIDRSet), Equals, 1)
c.Assert(string(rule.ToCIDRSet[0].Cidr), Equals, string(userCIDR))
Expand Down
8 changes: 8 additions & 0 deletions pkg/k8s/watchers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,13 @@ func (k *K8sWatcher) k8sServiceHandler() {
break
} else if result.NumToServicesRules > 0 {
// Only trigger policy updates if ToServices rules are in effect
k.ipcache.ReleaseCIDRIdentitiesByCIDR(result.PrefixesToRelease)
_, err := k.ipcache.AllocateCIDRs(result.PrefixesToAdd, nil, nil)
if err != nil {
scopedLog.WithError(err).
Error("Unabled to allocate ipcache CIDR for toService rule")
break
}
k.policyManager.TriggerPolicyUpdates(true, "Kubernetes service endpoint added")
}

Expand All @@ -610,6 +617,7 @@ func (k *K8sWatcher) k8sServiceHandler() {
break
} else if result.NumToServicesRules > 0 {
// Only trigger policy updates if ToServices rules are in effect
k.ipcache.ReleaseCIDRIdentitiesByCIDR(result.PrefixesToRelease)
k.policyManager.TriggerPolicyUpdates(true, "Kubernetes service endpoint deleted")
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/policy/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -612,6 +613,14 @@ type TranslationResult struct {
// NumToServicesRules is the number of ToServices rules processed while
// translating the rules
NumToServicesRules int

// BackendPrefixes contains all egress CIDRs that are to be added
// for the translation.
PrefixesToAdd []*net.IPNet

// BackendPrefixes contains all egress CIDRs that are to be removed
// for the translation.
PrefixesToRelease []*net.IPNet
}

// TranslateRules traverses rules and applies provided translator to rules
Expand Down

0 comments on commit 4b87ccc

Please sign in to comment.