Skip to content

Commit

Permalink
endpoint: Store the highest skipped regeneration level
Browse files Browse the repository at this point in the history
Store the highest skipped regeneration level so that the regeneration
can be performed on the required level.

Suggested-by: Dan Wendlandt <dan@covalent.io>
Signed-off-by: Jarno Rajahalme <jarno@covalent.io>
  • Loading branch information
jrajahalme committed Apr 14, 2020
1 parent 5478154 commit b7bf8bc
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 78 deletions.
11 changes: 9 additions & 2 deletions daemon/cmd/endpoint.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cilium/cilium/pkg/api"
"github.com/cilium/cilium/pkg/endpoint"
endpointid "github.com/cilium/cilium/pkg/endpoint/id"
"github.com/cilium/cilium/pkg/endpoint/regeneration"
"github.com/cilium/cilium/pkg/k8s"
k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
"github.com/cilium/cilium/pkg/labels"
Expand Down Expand Up @@ -440,8 +441,14 @@ func (h *patchEndpointID) Handle(params PatchEndpointIDParams) middleware.Respon
}

if reason != "" {
if err := ep.RegenerateWait(reason); err != nil {
return api.Error(PatchEndpointIDFailedCode, err)
regenMetadata := &regeneration.ExternalRegenerationMetadata{
Reason: reason,
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
}
if !<-ep.Regenerate(regenMetadata) {
return api.Error(PatchEndpointIDFailedCode,
fmt.Errorf("error while regenerating endpoint."+
" For more info run: 'cilium endpoint get %d'", ep.ID))
}
// FIXME: Special return code to indicate regeneration happened?
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/endpoint/api.go
Expand Up @@ -553,6 +553,12 @@ func (e *Endpoint) ProcessChangeRequest(newEp *Endpoint, validPatchTransitionSta
reason = "Waiting on endpoint regeneration because identity is known while handling API PATCH"
case StateWaitingForIdentity:
reason = "Waiting on endpoint initial program regeneration while handling API PATCH"
default:
// Caller skips regeneration if reason == "". Bump the skipped regeneration level so that next
// regeneration will realise endpoint changes.
if e.skippedRegenerationLevel < regeneration.RegenerateWithDatapathRewrite {
e.skippedRegenerationLevel = regeneration.RegenerateWithDatapathRewrite
}
}
}

Expand Down
61 changes: 23 additions & 38 deletions pkg/endpoint/endpoint.go
Expand Up @@ -279,6 +279,12 @@ type Endpoint struct {

eventQueue *eventqueue.EventQueue

// skippedRegenerationLevel is the DatapathRegenerationLevel of the regeneration event that
// was skipped due to another regeneration event already being queued, as indicated by
// state. A lower-level current regeneration is bumped to this level to cover for the
// skipped regeneration levels.
skippedRegenerationLevel regeneration.DatapathRegenerationLevel

// DatapathConfiguration is the endpoint's datapath configuration as
// passed in via the plugin that created the endpoint, e.g. the CNI
// plugin which performed the plumbing will enable certain datapath
Expand Down Expand Up @@ -866,19 +872,14 @@ func (e *Endpoint) Update(cfg *models.EndpointConfigurationSpec) error {
for {
select {
case <-ticker.C:
if err := e.lockAlive(); err != nil {
regen, err := e.SetRegenerateStateIfAlive(regenCtx)
if err != nil {
return err
}
// Check endpoint state before attempting configuration update because
// configuration updates can only be applied when the endpoint is in
// specific states. See GH-3058.
stateTransitionSucceeded := e.setState(StateWaitingToRegenerate, regenCtx.Reason)
if stateTransitionSucceeded {
e.unlock()
if regen {
e.Regenerate(regenCtx)
return nil
}
e.unlock()
case <-timeout:
e.getLogger().Warning("timed out waiting for endpoint state to change")
return UpdateStateChangeError{fmt.Sprintf("unable to regenerate endpoint program because state transition to %s was unsuccessful; check `cilium endpoint log %d` for more information", StateWaitingToRegenerate, e.ID)}
Expand Down Expand Up @@ -1023,19 +1024,6 @@ func (e *Endpoint) leaveLocked(proxyWaitGroup *completion.WaitGroup, conf Delete
return errors
}

// RegenerateWait should only be called when endpoint's state has successfully
// been changed to "waiting-to-regenerate"
func (e *Endpoint) RegenerateWait(reason string) error {
if !<-e.Regenerate(&regeneration.ExternalRegenerationMetadata{
Reason: reason,
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
}) {
return fmt.Errorf("error while regenerating endpoint."+
" For more info run: 'cilium endpoint get %d'", e.ID)
}
return nil
}

// GetContainerName returns the name of the container for the endpoint.
func (e *Endpoint) GetContainerName() string {
e.unconditionalRLock()
Expand Down Expand Up @@ -1778,6 +1766,10 @@ func (e *Endpoint) identityLabelsChanged(ctx context.Context, myChangeRev int) e
}

readyToRegenerate := false
regenMetadata := &regeneration.ExternalRegenerationMetadata{
Reason: "updated security labels",
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
}

// Regeneration is only triggered once the endpoint ID has been
// assigned. This ensures that on the initial creation, the endpoint is
Expand All @@ -1788,7 +1780,7 @@ func (e *Endpoint) identityLabelsChanged(ctx context.Context, myChangeRev int) e
// called, the controller calling identityLabelsChanged() will trigger
// the regeneration as soon as the identity is known.
if e.ID != 0 {
readyToRegenerate = e.setState(StateWaitingToRegenerate, "Triggering regeneration due to new identity")
readyToRegenerate = e.setRegenerateStateLocked(regenMetadata)
}

// Unconditionally force policy recomputation after a new identity has been
Expand All @@ -1798,10 +1790,7 @@ func (e *Endpoint) identityLabelsChanged(ctx context.Context, myChangeRev int) e
e.unlock()

if readyToRegenerate {
e.Regenerate(&regeneration.ExternalRegenerationMetadata{
Reason: "updated security labels",
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
})
e.Regenerate(regenMetadata)
}

return nil
Expand Down Expand Up @@ -2125,15 +2114,15 @@ func (e *Endpoint) GetProxyInfoByFields() (uint64, string, string, []string, str
// program to be generated for the first time.
// * otherwise, waits for the endpoint to complete its first full regeneration.
func (e *Endpoint) RegenerateAfterCreation(ctx context.Context, syncBuild bool) error {
if err := e.lockAlive(); err != nil {
return fmt.Errorf("endpoint was deleted while processing the request")
regenMetadata := &regeneration.ExternalRegenerationMetadata{
Reason: "Initial build on endpoint creation",
ParentContext: ctx,
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
}

build := e.getState() == StateReady
if build {
e.setState(StateWaitingToRegenerate, "Identity is known at endpoint creation time")
build, err := e.SetRegenerateStateIfAlive(regenMetadata)
if err != nil {
return err
}
e.unlock()

if build {
// Do not synchronously regenerate the endpoint when first creating it.
Expand All @@ -2149,11 +2138,7 @@ func (e *Endpoint) RegenerateAfterCreation(ctx context.Context, syncBuild bool)
// is executed; if we waited for regeneration to be complete, including
// proxy configuration, this code would effectively deadlock addition
// of endpoints.
e.Regenerate(&regeneration.ExternalRegenerationMetadata{
Reason: "Initial build on endpoint creation",
ParentContext: ctx,
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
})
e.Regenerate(regenMetadata)
}

// Wait for endpoint to be in "ready" state if specified in API call.
Expand Down
93 changes: 61 additions & 32 deletions pkg/endpoint/policy.go
Expand Up @@ -287,6 +287,14 @@ func (e *Endpoint) regenerate(context *regenerationContext) (retErr error) {
return fmt.Errorf("Skipping build due to invalid state: %s", e.state)
}

// Bump priority if higher priority event was skipped.
// This must be done in the same critical section as the state transition above.
if e.skippedRegenerationLevel > context.datapathRegenerationContext.regenerationLevel {
context.datapathRegenerationContext.regenerationLevel = e.skippedRegenerationLevel
}
// reset to the default lowest level
e.skippedRegenerationLevel = regeneration.Invalid

e.unlock()

stats.prepareBuild.Start()
Expand Down Expand Up @@ -427,31 +435,57 @@ func (e *Endpoint) updateRegenerationStatistics(context *regenerationContext, er
e.LogStatusOK(BPF, "Successfully regenerated endpoint program (Reason: "+context.Reason+")")
}

// SetRegenerateStateIfAlive tries to change the state of the endpoint for pending regeneration.
// Returns 'true' if 'e.Regenerate()' should be called after releasing the endpoint lock.
// Return 'false' if returned error is non-nil.
func (e *Endpoint) SetRegenerateStateIfAlive(regenMetadata *regeneration.ExternalRegenerationMetadata) (bool, error) {
regen := false
err := e.lockAlive()
if err != nil {
e.LogStatus(Policy, Failure, "Error while handling policy updates for endpoint: "+err.Error())
} else {
regen = e.setRegenerateStateLocked(regenMetadata)
e.unlock()
}
return regen, err
}

// setRegenerateStateLocked tries to change the state of the endpoint for pending regeneration.
// returns 'true' if 'e.Regenerate()' should be called after releasing the endpoint lock.
func (e *Endpoint) setRegenerateStateLocked(regenMetadata *regeneration.ExternalRegenerationMetadata) bool {
var regen bool
state := e.getState()
switch state {
case StateRestoring, StateWaitingToRegenerate:
// Bump the skipped regeneration level if needed so that the existing/queued
// regeneration can regenerate on the required level.
if regenMetadata.RegenerationLevel > e.skippedRegenerationLevel {
e.skippedRegenerationLevel = regenMetadata.RegenerationLevel
e.logStatusLocked(Other, OK, fmt.Sprintf("Skipped duplicate endpoint regeneration level %s trigger due to %s", regenMetadata.RegenerationLevel.String(), regenMetadata.Reason))
} else {
e.logStatusLocked(Other, OK, fmt.Sprintf("Skipped duplicate endpoint regeneration trigger due to %s", regenMetadata.Reason))
}
regen = false
default:
regen = e.setState(StateWaitingToRegenerate, fmt.Sprintf("Triggering endpoint regeneration due to %s", regenMetadata.Reason))
}
return regen
}

// RegenerateIfAlive queue a regeneration of this endpoint into the build queue
// of the endpoint and returns a channel that is closed when the regeneration of
// the endpoint is complete. The channel returns:
// - false if the regeneration failed
// - true if the regeneration succeed
// - nothing and the channel is closed if the regeneration did not happen
func (e *Endpoint) RegenerateIfAlive(regenMetadata *regeneration.ExternalRegenerationMetadata) <-chan bool {
if err := e.lockAlive(); err != nil {
regen, err := e.SetRegenerateStateIfAlive(regenMetadata)
if err != nil {
log.WithError(err).Warnf("Endpoint disappeared while queued to be regenerated: %s", regenMetadata.Reason)
e.LogStatus(Policy, Failure, "Error while handling policy updates for endpoint: "+err.Error())
} else {
var regen bool
state := e.getState()
switch state {
case StateRestoring, StateWaitingToRegenerate:
e.logStatusLocked(Other, OK, fmt.Sprintf("Skipped duplicate endpoint regeneration trigger due to %s", regenMetadata.Reason))
regen = false
default:
regen = e.setState(StateWaitingToRegenerate, fmt.Sprintf("Triggering endpoint regeneration due to %s", regenMetadata.Reason))
}
e.unlock()
if regen {
// Regenerate logs status according to the build success/failure
return e.Regenerate(regenMetadata)
}
}
if regen {
// Regenerate logs status according to the build success/failure
return e.Regenerate(regenMetadata)
}

ch := make(chan bool)
Expand Down Expand Up @@ -560,20 +594,7 @@ func (e *Endpoint) startRegenerationFailureHandler() {
return nil
}

if err := e.lockAlive(); err != nil {
// We don't need to regenerate because the endpoint is d
// disconnecting / is disconnected, exit gracefully.
return nil
}

stateTransitionSucceeded := e.setState(StateWaitingToRegenerate, reasonRegenRetry)
e.unlock()
if !stateTransitionSucceeded {
// Another regeneration has already been enqueued.
return nil
}

r := &regeneration.ExternalRegenerationMetadata{
regenMetadata := &regeneration.ExternalRegenerationMetadata{
// TODO (ianvernon) - is there a way we can plumb a parent
// context to a controller (e.g., endpoint.aliveCtx)?
ParentContext: ctx,
Expand All @@ -582,7 +603,15 @@ func (e *Endpoint) startRegenerationFailureHandler() {
// of the failure, simply that something failed.
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
}
if success := <-e.Regenerate(r); success {
regen, _ := e.SetRegenerateStateIfAlive(regenMetadata)
if !regen {
// We don't need to regenerate because the endpoint is d
// disconnecting / is disconnected, or another regeneration has
// already been enqueued. Exit gracefully.
return nil
}

if success := <-e.Regenerate(regenMetadata); success {
return nil
}
return fmt.Errorf("regeneration recovery failed")
Expand Down
13 changes: 7 additions & 6 deletions pkg/k8s/watchers/pod.go
Expand Up @@ -242,15 +242,16 @@ func (k *K8sWatcher) updateK8sPodV1(oldK8sPod, newK8sPod *types.Pod) error {
}

func realizePodAnnotationUpdate(podEP *endpoint.Endpoint) {
regenMetadata := &regeneration.ExternalRegenerationMetadata{
Reason: "annotations updated",
RegenerationLevel: regeneration.RegenerateWithoutDatapath,
}
// No need to log an error if the state transition didn't succeed,
// if it didn't succeed that means the endpoint is being deleted, or
// another regeneration has already been queued up for this endpoint.
stateTransitionSucceeded := podEP.SetState(endpoint.StateWaitingToRegenerate, "annotations updated")
if stateTransitionSucceeded {
podEP.Regenerate(&regeneration.ExternalRegenerationMetadata{
Reason: "annotations updated",
RegenerationLevel: regeneration.RegenerateWithoutDatapath,
})
regen, _ := podEP.SetRegenerateStateIfAlive(regenMetadata)
if regen {
podEP.Regenerate(regenMetadata)
}
}

Expand Down

0 comments on commit b7bf8bc

Please sign in to comment.