Skip to content

Commit

Permalink
pkg/endpoint: always copy existing state during synchronization
Browse files Browse the repository at this point in the history
Endpoint regeneration goes to a lot of trouble to keep track of
whether the on-disk state has changed, only to avoid doing a couple
of readdir syscalls. The behaviour was added in commit  f6c4385
("pkg/endpoint: Keep BPF object files if compilation is skipped.")
and the message does not indicate that performance was of particular
concern.

Do the safe thing and always perform the state copy.

Signed-off-by: Lorenz Bauer <lmb@isovalent.com>
  • Loading branch information
lmb authored and nathanjsweet committed May 13, 2024
1 parent acf3141 commit 900c846
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 56 deletions.
75 changes: 36 additions & 39 deletions pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,9 @@ func (e *Endpoint) removeOldRedirects(desiredRedirects, realizedRedirects map[st
// Returns the policy revision number when the regeneration has called,
// Whether the new state dir is populated with all new BPF state files,
// and an error if something failed.
func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint64, stateDirComplete bool, reterr error) {
func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint64, reterr error) {
var (
err error
compilationExecuted bool
headerfileChanged bool
err error
)

stats := &regenContext.Stats
Expand All @@ -539,7 +537,7 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
// below (runPreCompilationSteps()), but this caused a deadlock with the
// IPCache. Therefore, we obtain the DNSRules outside the critical section.
rules := e.owner.GetDNSRules(e.ID)
headerfileChanged, err = e.runPreCompilationSteps(regenContext, rules)
err = e.runPreCompilationSteps(regenContext, rules)

// Keep track of the side-effects of the regeneration that need to be
// reverted in case of failure.
Expand All @@ -553,12 +551,12 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
}()

if err != nil {
return 0, false, err
return 0, err
}

// No need to compile BPF in dry mode.
if e.isProperty(PropertyFakeEndpoint) {
return e.nextPolicyRevision, false, nil
return e.nextPolicyRevision, nil
}

// Skip BPF if the endpoint has no policy map
Expand All @@ -572,20 +570,20 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = e.waitForProxyCompletions(datapathRegenCtxt.proxyWaitGroup)
stats.proxyWaitForAck.End(err == nil)
if err != nil {
return 0, false, fmt.Errorf("Error while updating network policy: %w", err)
return 0, fmt.Errorf("Error while updating network policy: %w", err)
}

return e.nextPolicyRevision, false, nil
return e.nextPolicyRevision, nil
}

// Wait for connection tracking cleaning to complete
stats.waitingForCTClean.Start()
<-datapathRegenCtxt.ctCleaned
stats.waitingForCTClean.End(true)

compilationExecuted, err = e.realizeBPFState(regenContext)
err = e.realizeBPFState(regenContext)
if err != nil {
return datapathRegenCtxt.epInfoCache.revision, compilationExecuted, err
return datapathRegenCtxt.epInfoCache.revision, err
}

if !datapathRegenCtxt.epInfoCache.IsHost() || option.Config.EnableHostFirewall {
Expand All @@ -594,7 +592,7 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = lxcmap.WriteEndpoint(datapathRegenCtxt.epInfoCache)
stats.mapSync.End(err == nil)
if err != nil {
return 0, compilationExecuted, fmt.Errorf("Exposing new BPF failed: %w", err)
return 0, fmt.Errorf("Exposing new BPF failed: %w", err)
}
}

Expand All @@ -611,14 +609,14 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = e.waitForProxyCompletions(datapathRegenCtxt.proxyWaitGroup)
stats.proxyWaitForAck.End(err == nil)
if err != nil {
return 0, compilationExecuted, fmt.Errorf("error while configuring proxy redirects: %w", err)
return 0, fmt.Errorf("error while configuring proxy redirects: %w", err)
}

stats.waitingForLock.Start()
err = e.lockAlive()
stats.waitingForLock.End(err == nil)
if err != nil {
return 0, compilationExecuted, err
return 0, err
}
defer e.unlock()

Expand All @@ -637,14 +635,13 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = e.syncPolicyMap()
stats.mapSync.End(err == nil)
if err != nil {
return 0, compilationExecuted, fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
return 0, fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
}

stateDirComplete = headerfileChanged && compilationExecuted
return datapathRegenCtxt.epInfoCache.revision, stateDirComplete, err
return datapathRegenCtxt.epInfoCache.revision, err
}

func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilationExecuted bool, err error) {
func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (err error) {
stats := &regenContext.Stats
datapathRegenCtxt := regenContext.datapathRegenerationContext
debugEnabled := logging.CanLogAt(e.getLogger().Logger, logrus.DebugLevel)
Expand All @@ -669,7 +666,6 @@ func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilati
} else if !errors.Is(err, context.Canceled) {
e.getLogger().WithError(err).Error("Error while rewriting endpoint BPF program")
}
compilationExecuted = true
} else { // RegenerateWithDatapathLoad
err = e.owner.Datapath().Loader().ReloadDatapath(datapathRegenCtxt.completionCtx, datapathRegenCtxt.epInfoCache, &stats.datapathRealization)
if err == nil {
Expand All @@ -680,23 +676,23 @@ func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilati
}

if err != nil {
return compilationExecuted, err
return err
}
e.bpfHeaderfileHash = datapathRegenCtxt.bpfHeaderfilesHash
} else if debugEnabled {
e.getLogger().WithField(logfields.BPFHeaderfileHash, datapathRegenCtxt.bpfHeaderfilesHash).
Debug("BPF header file unchanged, skipping BPF compilation and installation")
}

return compilationExecuted, nil
return nil
}

// runPreCompilationSteps runs all of the regeneration steps that are necessary
// right before compiling the BPF for the given endpoint.
// The endpoint mutex must not be held.
//
// Returns whether the headerfile changed and/or an error.
func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rules restore.DNSRules) (headerfileChanged bool, preCompilationError error) {
func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rules restore.DNSRules) (preCompilationError error) {
stats := &regenContext.Stats
datapathRegenCtxt := regenContext.datapathRegenerationContext

Expand All @@ -707,14 +703,14 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
policyResult, err := e.regeneratePolicy()
stats.policyCalculation.End(err == nil)
if err != nil {
return false, fmt.Errorf("unable to regenerate policy for '%s': %w", e.StringID(), err)
return fmt.Errorf("unable to regenerate policy for '%s': %w", e.StringID(), err)
}

stats.waitingForLock.Start()
err = e.lockAlive()
stats.waitingForLock.End(err == nil)
if err != nil {
return false, err
return err
}

defer e.unlock()
Expand Down Expand Up @@ -750,7 +746,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
// where an unnecessary policy computation was skipped. In that case
// e.desiredPolicy == e.realizedPolicy also after this call.
if err := e.setDesiredPolicy(policyResult); err != nil {
return false, err
return err
}

// We cannot obtain the rules while e.mutex is held, because obtaining
Expand All @@ -766,17 +762,17 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
// Dry mode needs Network Policy Updates, but the proxy wait group must
// not be initialized, as there is no proxy ACKing the changes.
if err, _ = e.updateNetworkPolicy(nil); err != nil {
return false, err
return err
}

if err = e.writeHeaderfile(nextDir); err != nil {
return false, fmt.Errorf("Unable to write header file: %w", err)
return fmt.Errorf("Unable to write header file: %w", err)
}

if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithField(logfields.EndpointID, e.ID).Debug("Skipping bpf updates due to dry mode")
}
return false, nil
return nil
}

// Endpoints without policy maps only need Network Policy Updates
Expand All @@ -796,25 +792,25 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
err, networkPolicyRevertFunc := e.updateNetworkPolicy(datapathRegenCtxt.proxyWaitGroup)
stats.proxyPolicyCalculation.End(err == nil)
if err != nil {
return false, err
return err
}
datapathRegenCtxt.revertStack.Push(networkPolicyRevertFunc)
}
return false, nil
return nil
}

if e.policyMap == nil {
e.policyMap, err = policymap.OpenOrCreate(e.policyMapPath())
if err != nil {
return false, err
return err
}

// Synchronize the in-memory realized state with BPF map entries,
// so that any potential discrepancy between desired and realized
// state would be dealt with by the following e.syncPolicyMap.
pm, err := e.dumpPolicyMapToMapState()
if err != nil {
return false, err
return err
}
e.realizedPolicy.SetPolicyMap(pm)
e.updatePolicyMapPressureMetric()
Expand Down Expand Up @@ -843,7 +839,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
desiredRedirects, err, finalizeFunc, revertFunc = e.addNewRedirects(datapathRegenCtxt.proxyWaitGroup)
stats.proxyConfiguration.End(err == nil)
if err != nil {
return false, err
return err
}
datapathRegenCtxt.finalizeList.Append(finalizeFunc)
datapathRegenCtxt.revertStack.Push(revertFunc)
Expand Down Expand Up @@ -873,7 +869,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
err, networkPolicyRevertFunc := e.updateNetworkPolicy(datapathRegenCtxt.proxyWaitGroup)
stats.proxyPolicyCalculation.End(err == nil)
if err != nil {
return false, err
return err
}
datapathRegenCtxt.revertStack.Push(networkPolicyRevertFunc)

Expand All @@ -887,7 +883,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
err = e.syncPolicyMap()
stats.mapSync.End(err == nil)
if err != nil {
return false, fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
return fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
}

// At this point, traffic is no longer redirected to the proxy for
Expand All @@ -901,7 +897,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
}

if e.isProperty(PropertySkipBPFRegeneration) {
return false, nil
return nil
}

stats.prepareBuild.Start()
Expand All @@ -911,6 +907,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul

// Avoid BPF program compilation and installation if the headerfile for the endpoint
// or the node have not changed.
var headerfileChanged bool
datapathRegenCtxt.bpfHeaderfilesHash, err = e.owner.Datapath().Loader().EndpointHash(e)
if err != nil {
e.getLogger().WithError(err).Warn("Unable to hash header file")
Expand All @@ -929,7 +926,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
}
if datapathRegenCtxt.regenerationLevel >= regeneration.RegenerateWithDatapathRewrite {
if err := e.writeHeaderfile(nextDir); err != nil {
return false, fmt.Errorf("unable to write header file: %w", err)
return fmt.Errorf("unable to write header file: %w", err)
}
}

Expand All @@ -940,10 +937,10 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
datapathRegenCtxt.epInfoCache = e.createEpInfoCache(currentDir)
}
if datapathRegenCtxt.epInfoCache == nil {
return headerfileChanged, fmt.Errorf("Unable to cache endpoint information")
return fmt.Errorf("Unable to cache endpoint information")
}

return headerfileChanged, nil
return nil
}

func (e *Endpoint) finalizeProxyState(regenContext *regenerationContext, err error) {
Expand Down
16 changes: 4 additions & 12 deletions pkg/endpoint/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ func (e *Endpoint) NextDirectoryPath() string {
// It assumes that oldDir and newDir are an endpoint's old and new state
// directories (see synchronizeDirectories below).
func copyExistingState(oldDir, newDir string) error {
var err error

oldDirFile, err := os.Open(oldDir)
if err != nil {
return fmt.Errorf("failed to open old endpoint state dir: %w", err)
Expand Down Expand Up @@ -95,7 +93,7 @@ func copyExistingState(oldDir, newDir string) error {
// Returns the original regenerationError if regenerationError was non-nil,
// or if any updates to directories for the endpoint's directories fails.
// Must be called with endpoint.mutex Lock()ed.
func (e *Endpoint) synchronizeDirectories(origDir string, stateDirComplete bool) error {
func (e *Endpoint) synchronizeDirectories(origDir string) error {
scopedLog := e.getLogger()
debugLogEnabled := logging.CanLogAt(scopedLog.Logger, logrus.DebugLevel)

Expand All @@ -111,15 +109,9 @@ func (e *Endpoint) synchronizeDirectories(origDir string, stateDirComplete bool)
// An endpoint directory already exists. We need to back it up before attempting
// to move the new directory in its place so we can attempt recovery.
case !os.IsNotExist(err):
// If the compilation was skipped then we need to copy the old
// bpf objects into the new directory
if !stateDirComplete {
scopedLog.Debug("retaining existing state")
err := copyExistingState(origDir, tmpDir)
if err != nil {
scopedLog.WithError(err).Debugf("unable to copy state "+
"from %s into the new directory %s.", tmpDir, origDir)
}
if err := copyExistingState(origDir, tmpDir); err != nil {
scopedLog.WithError(err).Debugf("unable to copy state "+
"from %s into the new directory %s.", tmpDir, origDir)
}

// Atomically exchange the two directories.
Expand Down
9 changes: 4 additions & 5 deletions pkg/endpoint/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ func (e *Endpoint) updateAndOverrideEndpointOptions(opts option.OptionMap) (opts
// Called with e.mutex UNlocked
func (e *Endpoint) regenerate(ctx *regenerationContext) (retErr error) {
var revision uint64
var stateDirComplete bool
var err error

ctx.Stats = regenerationStatistics{}
Expand Down Expand Up @@ -485,7 +484,7 @@ func (e *Endpoint) regenerate(ctx *regenerationContext) (retErr error) {
e.unlock()
}()

revision, stateDirComplete, err = e.regenerateBPF(ctx)
revision, err = e.regenerateBPF(ctx)

// Write full verifier log to the endpoint directory.
var ve *ebpf.VerifierError
Expand Down Expand Up @@ -516,13 +515,13 @@ func (e *Endpoint) regenerate(ctx *regenerationContext) (retErr error) {
return err
}

return e.updateRealizedState(stats, origDir, revision, stateDirComplete)
return e.updateRealizedState(stats, origDir, revision)
}

// updateRealizedState sets any realized state fields within the endpoint to
// be the desired state of the endpoint. This is only called after a successful
// regeneration of the endpoint.
func (e *Endpoint) updateRealizedState(stats *regenerationStatistics, origDir string, revision uint64, stateDirComplete bool) error {
func (e *Endpoint) updateRealizedState(stats *regenerationStatistics, origDir string, revision uint64) error {
// Update desired policy for endpoint because policy has now been realized
// in the datapath. PolicyMap state is not updated here, because that is
// performed in endpoint.syncPolicyMap().
Expand All @@ -538,7 +537,7 @@ func (e *Endpoint) updateRealizedState(stats *regenerationStatistics, origDir st
// Depending upon result of BPF regeneration (compilation executed),
// shift endpoint directories to match said BPF regeneration
// results.
err = e.synchronizeDirectories(origDir, stateDirComplete)
err = e.synchronizeDirectories(origDir)
if err != nil {
return fmt.Errorf("error synchronizing endpoint BPF program directories: %w", err)
}
Expand Down

0 comments on commit 900c846

Please sign in to comment.