Skip to content

Commit

Permalink
endpoint: Enhance policy map sync
Browse files Browse the repository at this point in the history
When syncing policy map with dump, compare the desired policy map to
the dumped map for both deletes and adds. Record and log any
differences found.

Fixes: #14358
Fixes: #14357
Signed-off-by: Jarno Rajahalme <jarno@covalent.io>
  • Loading branch information
jrajahalme committed Jan 25, 2021
1 parent 4c827bf commit 9dc1350
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 152 deletions.
163 changes: 86 additions & 77 deletions pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,65 +1238,95 @@ func (e *Endpoint) applyPolicyMapChanges() (proxyChanges bool, err error) {
// difference between the realized and desired policy state without
// dumping the bpf policy map.
func (e *Endpoint) syncPolicyMap() error {
e.policyDebug(logrus.Fields{
"policyRealized": e.realizedPolicy == e.desiredPolicy,
}, "syncPolicyMap")
// Apply pending policy map changes first so that desired map is up-to-date before
// we diff the maps below.
_, err := e.applyPolicyMapChanges()
if err != nil {
return err
}

// Nothing to do if the desired policy is already fully realized.
if e.realizedPolicy != e.desiredPolicy {
errors := 0

// Add policy map entries before deleting to avoid transient drops
err := e.addPolicyMapDelta()
if err != nil {
errors++
}

// Delete policy keys present in the realized state, but not present in the desired state
for keyToDelete := range e.realizedPolicy.PolicyMapState {
// If key that is in realized state is not in desired state, just remove it.
if _, ok := e.desiredPolicy.PolicyMapState[keyToDelete]; !ok {
if !e.deletePolicyKey(keyToDelete, false, nil) {
errors++
}
}
}

if errors > 0 {
return fmt.Errorf("syncPolicyMap failed")
}
if e.realizedPolicy == e.desiredPolicy {
e.policyDebug(nil, "syncPolicyMap(): not syncing as desired == realized")
return nil
}

// Still may have changes due to identities added and/or
// deleted after the desired policy was computed.
_, err := e.applyPolicyMapChanges()
// Diffs between the maps are expected here, so do not bother collecting them
_, _, err = e.syncDesiredPolicyMapWith(e.realizedPolicy.PolicyMapState, false)
return err
}

// addPolicyMapDelta adds new or updates existing bpf policy map state based
// on the difference between the realized and desired policy state without
// syncDesiredPolicyMapWith updates the bpf policy map state based on the
// difference between the given 'realized' and desired policy state without
// dumping the bpf policy map.
func (e *Endpoint) addPolicyMapDelta() error {
// Nothing to do if the desired policy is already fully realized.
if e.realizedPolicy == e.desiredPolicy {
return nil
}

func (e *Endpoint) syncDesiredPolicyMapWith(realized policy.MapState, withDiffs bool) (diffCount int, diffs []policy.MapChange, err error) {
errors := 0

// Add policy map entries before deleting to avoid transient drops
for keyToAdd, entry := range e.desiredPolicy.PolicyMapState {
if oldEntry, ok := e.realizedPolicy.PolicyMapState[keyToAdd]; !ok || !oldEntry.DatapathEqual(&entry) {
if oldEntry, ok := realized[keyToAdd]; !ok || !oldEntry.DatapathEqual(&entry) {
// Redirect entries currently come in with a dummy redirect port ("1"), replace it with
// the actual proxy port number. This is due to the fact that proxies may not yet have
// bound to a specific port when a proxy policy is first instantiated.
if entry.IsRedirectEntry() {
// Will change to 0 if on a sidecar
entry.ProxyPort = e.realizedRedirects[policy.ProxyIDFromKey(e.ID, keyToAdd)]
}
if !e.addPolicyKey(keyToAdd, entry, false) {
errors++
}
diffCount++
if withDiffs {
diffs = append(diffs, policy.MapChange{Add: true, Key: keyToAdd, Value: entry})
}
}
}

// Delete policy keys present in the realized state, but not present in the desired state
for keyToDelete := range realized {
// If key that is in realized state is not in desired state, just remove it.
if entry, ok := e.desiredPolicy.PolicyMapState[keyToDelete]; !ok {
if !e.deletePolicyKey(keyToDelete, false, nil) {
errors++
}
diffCount++
if withDiffs {
diffs = append(diffs, policy.MapChange{Add: false, Key: keyToDelete, Value: entry})
}
}
}

if errors > 0 {
return fmt.Errorf("updating desired PolicyMap state failed")
err = fmt.Errorf("syncPolicyMap failed")
}
return diffCount, diffs, err
}

return nil
func (e *Endpoint) dumpPolicyMapToMapState() (policy.MapState, error) {
currentMap := make(policy.MapState)

cb := func(key bpf.MapKey, value bpf.MapValue) {
// Convert key to host byte-order. ToHost() makes a copy.
keyHostOrder := key.(*policymap.PolicyKey).ToHost()
// Convert from policymap.Key to policy.Key
policyKey := policy.Key{
Identity: keyHostOrder.Identity,
DestPort: keyHostOrder.DestPort,
Nexthdr: keyHostOrder.Nexthdr,
TrafficDirection: keyHostOrder.TrafficDirection,
}
// Convert value to host byte-order. ToHost() makes a copy.
entryHostOrder := value.(*policymap.PolicyEntry).ToHost()
// Convert from policymap.PolicyEntry to policy.MapStateEntry.
policyEntry := policy.MapStateEntry{
ProxyPort: entryHostOrder.ProxyPort,
IsDeny: policymap.PolicyEntryFlags(entryHostOrder.GetFlags()).IsDeny(),
}
currentMap[policyKey] = policyEntry
}
err := e.policyMap.DumpWithCallback(cb)

return currentMap, err
}

// syncPolicyMapWithDump attempts to synchronize the PolicyMap for this endpoint to
Expand All @@ -1309,16 +1339,18 @@ func (e *Endpoint) addPolicyMapDelta() error {
// PolicyMap is unable to be dumped, or any update operation to the map fails.
// Must be called with e.mutex Lock()ed.
func (e *Endpoint) syncPolicyMapWithDump() error {

if e.realizedPolicy.PolicyMapState == nil {
e.realizedPolicy.PolicyMapState = make(policy.MapState)
}

if e.policyMap == nil {
return fmt.Errorf("not syncing PolicyMap state for endpoint because PolicyMap is nil")
}

currentMapContents, err := e.policyMap.DumpKeysToSlice()
// Apply pending policy map changes first so that desired map is up-to-date before
// we diff the maps below.
_, err := e.applyPolicyMapChanges()
if err != nil {
return err
}

currentMap, err := e.dumpPolicyMapToMapState()

// If map is unable to be dumped, attempt to close map and open it again.
// See GH-4229.
Expand All @@ -1339,44 +1371,21 @@ func (e *Endpoint) syncPolicyMapWithDump() error {
}

// Try to dump again, fail if error occurs.
currentMapContents, err = e.policyMap.DumpKeysToSlice()
currentMap, err = e.dumpPolicyMapToMapState()
if err != nil {
return err
}
}

errors := 0

// Log full policy map for every dump
e.policyDebug(logrus.Fields{
"dumpedPolicyMap": currentMapContents,
}, "syncPolicyMapWithDump")

for _, entry := range currentMapContents {
// Convert key to host-byte order for lookup in the desiredMapState.
keyHostOrder := entry.ToHost()

// Convert from policymap.Key to policy.Key
keyToDelete := policy.Key{
Identity: keyHostOrder.Identity,
DestPort: keyHostOrder.DestPort,
Nexthdr: keyHostOrder.Nexthdr,
TrafficDirection: keyHostOrder.TrafficDirection,
}

// If key that is in policy map is not in desired state, just remove it.
if _, ok := e.desiredPolicy.PolicyMapState[keyToDelete]; !ok {
e.getLogger().WithField(logfields.BPFMapKey, entry.String()).Debug("syncPolicyMapWithDump removing a bpf policy entry not in the desired state")
if !e.deletePolicyKey(keyToDelete, false, nil) {
errors++
}
}
}

err = e.addPolicyMapDelta()

if errors > 0 {
return fmt.Errorf("synchronizing desired PolicyMap state failed")
e.policyDebug(logrus.Fields{"dumpedPolicyMap": currentMap}, "syncPolicyMapWithDump")
// Diffs between the maps indicate an error in the policy map update logic.
// Collect and log diffs if policy logging is enabled.
diffCount, diffs, err := e.syncDesiredPolicyMapWith(currentMap, e.getPolicyLogger() != nil)

if diffCount > 0 {
e.getLogger().WithField(logfields.Count, diffCount).Warning("Policy map sync fixed errors, consider running with debug verbose = policy to get detailed dumps")
e.policyDebug(logrus.Fields{"dumpedDiffs": diffs}, "syncPolicyMapWithDump")
}

return err
Expand Down
24 changes: 12 additions & 12 deletions pkg/maps/policymap/policymap.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ type PolicyEntry struct {
Bytes uint64 `align:"bytes"`
}

// ToHost returns a copy of entry with fields converted from network byte-order
// to host-byte-order if necessary.
func (pe *PolicyEntry) ToHost() PolicyEntry {
if pe == nil {
return PolicyEntry{}
}

n := *pe
n.ProxyPort = byteorder.NetworkToHost(n.ProxyPort).(uint16)
return n
}

func (pe *PolicyEntry) SetFlags(flags uint8) {
pe.Flags = flags
}
Expand Down Expand Up @@ -375,18 +387,6 @@ func (pm *PolicyMap) DumpToSlice() (PolicyEntriesDump, error) {
return entries, err
}

func (pm *PolicyMap) DumpKeysToSlice() ([]PolicyKey, error) {
var policyKeys []PolicyKey

cb := func(key bpf.MapKey, value bpf.MapValue) {
keyDump := *key.DeepCopyMapKey().(*PolicyKey)
policyKeys = append(policyKeys, keyDump)
}
err := pm.DumpWithCallback(cb)

return policyKeys, err
}

func newMap(path string) *PolicyMap {
mapType := bpf.MapTypeHash
flags := bpf.GetPreAllocateMapFlags(mapType)
Expand Down
54 changes: 0 additions & 54 deletions pkg/maps/policymap/policymap_privileged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,6 @@ func (pm *PolicyMapTestSuite) TestPolicyMapDumpToSlice(c *C) {
c.Assert(len(dump), Equals, 2)
}

func (pm *PolicyMapTestSuite) TestPolicyMapDumpKeysToSlice(c *C) {
c.Assert(testMap, NotNil)

fooEntry := newKey(1, 1, 1, 1)
err := testMap.AllowKey(fooEntry, 0)
c.Assert(err, IsNil)

dump, err := testMap.DumpKeysToSlice()
c.Assert(err, IsNil)
c.Assert(len(dump), Equals, 1)

// FIXME: It's weird that AllowKey() does the implicit byteorder
// conversion above. But not really a bug, so work around it.
fooEntry = fooEntry.ToNetwork()
c.Assert(dump[0], checker.DeepEquals, fooEntry)

// Special case: allow-all entry
barEntry := newKey(0, 0, 0, 0)
err = testMap.AllowKey(barEntry, 0)
c.Assert(err, IsNil)

dump, err = testMap.DumpKeysToSlice()
c.Assert(err, IsNil)
c.Assert(len(dump), Equals, 2)
}

func (pm *PolicyMapTestSuite) TestDeleteNonexistentKey(c *C) {
key := newKey(27, 80, u8proto.ANY, trafficdirection.Ingress)
err := testMap.Map.Delete(&key)
Expand Down Expand Up @@ -167,31 +141,3 @@ func (pm *PolicyMapTestSuite) TestDenyPolicyMapDumpToSlice(c *C) {
c.Assert(err, IsNil)
c.Assert(len(dump), Equals, 2)
}

func (pm *PolicyMapTestSuite) TestDenyPolicyMapDumpKeysToSlice(c *C) {
c.Assert(testMap, NotNil)

fooEntry := newKey(1, 1, 1, 1)
fooValue := newEntry(0, NewPolicyEntryFlag(&PolicyEntryFlagParam{IsDeny: true}))
err := testMap.DenyKey(fooEntry)
c.Assert(err, IsNil)

dump, err := testMap.DumpToSlice()
c.Assert(err, IsNil)
c.Assert(len(dump), Equals, 1)

// FIXME: It's weird that AllowKey() does the implicit byteorder
// conversion above. But not really a bug, so work around it.
fooEntry = fooEntry.ToNetwork()
c.Assert(dump[0].Key, checker.DeepEquals, fooEntry)
c.Assert(dump[0].PolicyEntry, checker.DeepEquals, fooValue)

// Special case: deny-all entry
barEntry := newKey(0, 0, 0, 0)
err = testMap.DenyKey(barEntry)
c.Assert(err, IsNil)

dump, err = testMap.DumpToSlice()
c.Assert(err, IsNil)
c.Assert(len(dump), Equals, 2)
}
18 changes: 9 additions & 9 deletions pkg/policy/mapstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,9 @@ type MapChanges struct {
}

type MapChange struct {
add bool // false deletes
key Key
value MapStateEntry
Add bool // false deletes
Key Key
Value MapStateEntry
}

// AccumulateMapChanges accumulates the given changes to the
Expand Down Expand Up @@ -515,11 +515,11 @@ func (mc *MapChanges) AccumulateMapChanges(cs CachedSelector, adds, deletes []id
mc.mutex.Lock()
for _, id := range adds {
key.Identity = id.Uint32()
mc.changes = append(mc.changes, MapChange{true, key, value})
mc.changes = append(mc.changes, MapChange{Add: true, Key: key, Value: value})
}
for _, id := range deletes {
key.Identity = id.Uint32()
mc.changes = append(mc.changes, MapChange{false, key, value})
mc.changes = append(mc.changes, MapChange{Add: false, Key: key, Value: value})
}
mc.mutex.Unlock()
}
Expand All @@ -532,15 +532,15 @@ func (mc *MapChanges) consumeMapChanges(policyMapState MapState) (adds, deletes
deletes = make(MapState, len(mc.changes))

for i := range mc.changes {
if mc.changes[i].add {
if mc.changes[i].Add {
// insert but do not allow non-redirect entries to overwrite a redirect entry,
// nor allow non-deny entries to overwrite deny entries.
// Collect the incremental changes to the overall state in 'mc.adds' and 'mc.deletes'.
policyMapState.denyPreferredInsertWithChanges(mc.changes[i].key, mc.changes[i].value, adds, deletes)
policyMapState.denyPreferredInsertWithChanges(mc.changes[i].Key, mc.changes[i].Value, adds, deletes)
} else {
// Delete the contribution of this cs to the key and collect incremental changes
for cs := range mc.changes[i].value.selectors { // get the sole selector
policyMapState.deleteKeyWithChanges(mc.changes[i].key, cs, adds, deletes)
for cs := range mc.changes[i].Value.selectors { // get the sole selector
policyMapState.deleteKeyWithChanges(mc.changes[i].Key, cs, adds, deletes)
}
}
}
Expand Down

0 comments on commit 9dc1350

Please sign in to comment.