Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

endpoint: do not export endpoint locking functions #9142

Merged
merged 10 commits into from Sep 9, 2019
4 changes: 1 addition & 3 deletions cilium-health/launch/endpoint.go
Expand Up @@ -334,11 +334,9 @@ func LaunchAsEndpoint(baseCtx context.Context, owner regeneration.Owner, n *node
return nil, fmt.Errorf("Error while adding endpoint: %s", err)
}

if err := ep.LockAlive(); err != nil {
if err := ep.PinDatapathMap(); err != nil {
return nil, err
}
ep.PinDatapathMap()
ep.Unlock()

// Give the endpoint a security identity
ctx, cancel := context.WithTimeout(baseCtx, LaunchTime)
Expand Down
7 changes: 0 additions & 7 deletions daemon/endpoint.go
Expand Up @@ -259,19 +259,12 @@ func (d *Daemon) createEndpoint(ctx context.Context, epTemplate *models.Endpoint
default:
}

if err := ep.LockAlive(); err != nil {
return d.errorDuringCreation(ep, fmt.Errorf("endpoint was deleted while processing the request"))
}

// Now that we have ep.ID we can pin the map from this point. This
// also has to happen before the first build took place.
if err = ep.PinDatapathMap(); err != nil {
ep.Unlock()
return d.errorDuringCreation(ep, fmt.Errorf("unable to pin datapath maps: %s", err))
}

ep.Unlock()

cfunc := func() {
// Only used for CRI-O since it does not support events.
if d.workloadsEventsCh != nil && ep.GetContainerID() != "" {
Expand Down
22 changes: 1 addition & 21 deletions daemon/endpoint_test.go
Expand Up @@ -76,27 +76,7 @@ func (ds *DaemonSuite) TestEndpointAddNoLabels(c *C) {
c.Assert(err, IsNil)
c.Assert(ep.OpLabels.IdentityLabels(), checker.DeepEquals, expectedLabels)

// Check that the endpoint received the reserved identity for the
// reserved:init entities.
timeout := time.NewTimer(3 * time.Second)
defer timeout.Stop()
tick := time.NewTicker(200 * time.Millisecond)
defer tick.Stop()
var secID *identity.Identity
Loop:
for {
select {
case <-timeout.C:
break Loop
case <-tick.C:
ep.UnconditionalRLock()
secID = ep.SecurityIdentity
ep.RUnlock()
if secID != nil {
break Loop
}
}
}
secID := ep.WaitForIdentity(3 * time.Second)
c.Assert(secID, Not(IsNil))
c.Assert(secID.ID, Equals, identity.ReservedIdentityInit)
}
Expand Down
8 changes: 2 additions & 6 deletions daemon/policy_test.go
Expand Up @@ -135,9 +135,7 @@ func (ds *DaemonSuite) prepareEndpoint(c *C, identity *identity.Identity, qa boo
}
e.SetIdentity(identity, true)

e.UnconditionalLock()
ready := e.SetStateLocked(endpoint.StateWaitingToRegenerate, "test")
e.Unlock()
ready := e.SetState(endpoint.StateWaitingToRegenerate, "test")
c.Assert(ready, Equals, true)
buildSuccess := <-e.Regenerate(regenerationMetadata)
c.Assert(buildSuccess, Equals, true)
Expand All @@ -146,9 +144,7 @@ func (ds *DaemonSuite) prepareEndpoint(c *C, identity *identity.Identity, qa boo
}

func (ds *DaemonSuite) regenerateEndpoint(c *C, e *endpoint.Endpoint) {
e.UnconditionalLock()
ready := e.SetStateLocked(endpoint.StateWaitingToRegenerate, "test")
e.Unlock()
ready := e.SetState(endpoint.StateWaitingToRegenerate, "test")
c.Assert(ready, Equals, true)
buildSuccess := <-e.Regenerate(regenerationMetadata)
c.Assert(buildSuccess, Equals, true)
Expand Down
2 changes: 1 addition & 1 deletion daemon/state.go
Expand Up @@ -143,7 +143,7 @@ func (d *Daemon) restoreOldEndpoints(dir string, clean bool) (*endpointRestoreSt
for _, ep := range possibleEPs {
scopedLog := log.WithField(logfields.EndpointID, ep.ID)
if k8s.IsEnabled() {
scopedLog = scopedLog.WithField("k8sPodName", ep.GetK8sNamespaceAndPodNameLocked())
scopedLog = scopedLog.WithField("k8sPodName", ep.GetK8sNamespaceAndPodName())
}

restore, err := d.validateEndpoint(ep)
Expand Down
22 changes: 11 additions & 11 deletions pkg/endpoint/api.go
Expand Up @@ -38,7 +38,7 @@ import (
// GetLabelsModel returns the labels of the endpoint in their representation
// for the Cilium API. Returns an error if the Endpoint is being deleted.
func (e *Endpoint) GetLabelsModel() (*models.LabelConfiguration, error) {
if err := e.RLockAlive(); err != nil {
if err := e.rlockAlive(); err != nil {
return nil, err
}
spec := &models.LabelConfigurationSpec{
Expand All @@ -54,7 +54,7 @@ func (e *Endpoint) GetLabelsModel() (*models.LabelConfiguration, error) {
Disabled: e.OpLabels.Disabled.GetModel(),
},
}
e.RUnlock()
e.runlock()
return &cfg, nil
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func NewEndpointFromChangeModel(owner regeneration.Owner, base *models.EndpointC
ep.SetDefaultOpts(option.Config.Opts)

ep.UpdateLogger(nil)
ep.SetStateLocked(string(base.State), "Endpoint creation")
ep.setState(string(base.State), "Endpoint creation")

return ep, nil
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func (e *Endpoint) GetModelRLocked() *models.Endpoint {
ContainerName: e.ContainerName,
DockerEndpointID: e.DockerEndpointID,
DockerNetworkID: e.DockerNetworkID,
PodName: e.GetK8sNamespaceAndPodNameLocked(),
PodName: e.getK8sNamespaceAndPodName(),
},
// FIXME GH-3280 When we begin returning endpoint revisions this should
// change to return the configured and in-datapath policies.
Expand Down Expand Up @@ -450,10 +450,10 @@ func (e *Endpoint) ProcessChangeRequest(newEp *Endpoint, validPatchTransitionSta
reason string
)

if err := e.LockAlive(); err != nil {
if err := e.lockAlive(); err != nil {
return "", err
}
defer e.Unlock()
defer e.unlock()

if newEp.ifIndex != 0 && e.ifIndex != newEp.ifIndex {
e.ifIndex = newEp.ifIndex
Expand All @@ -474,7 +474,7 @@ func (e *Endpoint) ProcessChangeRequest(newEp *Endpoint, validPatchTransitionSta
validPatchTransitionState &&
e.GetStateLocked() != StateWaitingForIdentity {
// Will not change state if the current state does not allow the transition.
if e.SetStateLocked(StateWaitingForIdentity, "Update endpoint from API PATCH") {
if e.setState(StateWaitingForIdentity, "Update endpoint from API PATCH") {
changed = true
}
}
Expand Down Expand Up @@ -505,7 +505,7 @@ func (e *Endpoint) ProcessChangeRequest(newEp *Endpoint, validPatchTransitionSta
// If desired state is waiting-for-identity but identity is already
// known, bump it to ready state immediately to force re-generation
if e.GetStateLocked() == StateWaitingForIdentity && e.SecurityIdentity != nil {
e.SetStateLocked(StateReady, "Preparing to force endpoint regeneration because identity is known while handling API PATCH")
e.setState(StateReady, "Preparing to force endpoint regeneration because identity is known while handling API PATCH")
changed = true
}

Expand All @@ -518,7 +518,7 @@ func (e *Endpoint) ProcessChangeRequest(newEp *Endpoint, validPatchTransitionSta

// Transition to waiting-to-regenerate if ready.
if e.GetStateLocked() == StateReady {
e.SetStateLocked(StateWaitingToRegenerate, "Forcing endpoint regeneration because identity is known while handling API PATCH")
e.setState(StateWaitingToRegenerate, "Forcing endpoint regeneration because identity is known while handling API PATCH")
}

switch e.GetStateLocked() {
Expand Down Expand Up @@ -552,10 +552,10 @@ func (e *Endpoint) GetConfigurationStatus() *models.EndpointConfigurationStatus
// provided labels. Returns labels that were added and deleted. Returns an
// error if the endpoint is being deleted.
func (e *Endpoint) ApplyUserLabelChanges(lbls labels.Labels) (add, del labels.Labels, err error) {
if err := e.RLockAlive(); err != nil {
if err := e.rlockAlive(); err != nil {
return nil, nil, err
}
defer e.RUnlock()
defer e.runlock()
add, del = e.OpLabels.SplitUserLabelChanges(lbls)
return
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/endpoint/bpf.go
Expand Up @@ -437,12 +437,12 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
}

stats.waitingForLock.Start()
err = e.LockAlive()
err = e.lockAlive()
stats.waitingForLock.End(err == nil)
if err != nil {
return 0, compilationExecuted, err
}
defer e.Unlock()
defer e.unlock()

e.ctCleaned = true

Expand Down Expand Up @@ -521,13 +521,13 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext) (pr
datapathRegenCtxt := regenContext.datapathRegenerationContext

stats.waitingForLock.Start()
err := e.LockAlive()
err := e.lockAlive()
stats.waitingForLock.End(err == nil)
if err != nil {
return err
}

defer e.Unlock()
defer e.unlock()

currentDir := datapathRegenCtxt.currentDir
nextDir := datapathRegenCtxt.nextDir
Expand Down Expand Up @@ -701,12 +701,12 @@ func (e *Endpoint) finalizeProxyState(regenContext *regenerationContext, err err
if err == nil {
// Always execute the finalization code, even if the endpoint is
// terminating, in order to properly release resources.
e.UnconditionalLock()
e.unconditionalLock()
e.getLogger().Debug("Finalizing successful endpoint regeneration")
datapathRegenCtx.finalizeList.Finalize()
e.Unlock()
e.unlock()
} else {
if err := e.LockAlive(); err != nil {
if err := e.lockAlive(); err != nil {
e.getLogger().WithError(err).Debug("Skipping unnecessary reverting of endpoint regeneration changes")
return
}
Expand All @@ -715,7 +715,7 @@ func (e *Endpoint) finalizeProxyState(regenContext *regenerationContext, err err
e.getLogger().WithError(err).Error("Reverting endpoint regeneration changes failed")
}
e.getLogger().Debug("Finished reverting endpoint changes after BPF regeneration failed")
e.Unlock()
e.unlock()
}
}

Expand Down Expand Up @@ -807,9 +807,9 @@ func (e *Endpoint) scrubIPsInConntrackTableLocked() {
}

func (e *Endpoint) scrubIPsInConntrackTable() {
e.UnconditionalLock()
e.unconditionalLock()
e.scrubIPsInConntrackTableLocked()
e.Unlock()
e.unlock()
}

// SkipStateClean can be called on a endpoint before its first build to skip
Expand All @@ -820,9 +820,9 @@ func (e *Endpoint) scrubIPsInConntrackTable() {
// The endpoint lock must NOT be held.
func (e *Endpoint) SkipStateClean() {
// Mark conntrack as already cleaned
e.UnconditionalLock()
e.unconditionalLock()
e.ctCleaned = true
e.Unlock()
e.unlock()
}

func (e *Endpoint) deletePolicyKey(keyToDelete policy.Key, incremental bool) bool {
Expand Down Expand Up @@ -1074,10 +1074,10 @@ func (e *Endpoint) syncPolicyMapController() {
// Failure to lock is not an error, it means
// that the endpoint was disconnected and we
// should exit gracefully.
if err := e.LockAlive(); err != nil {
if err := e.lockAlive(); err != nil {
return controller.NewExitReason("Endpoint disappeared")
}
defer e.Unlock()
defer e.unlock()
return e.syncPolicyMapWithDump()
},
RunInterval: 1 * time.Minute,
Expand Down Expand Up @@ -1160,18 +1160,18 @@ func (e *Endpoint) FinishIPVLANInit(netNsPath string) error {
}

// Just ignore if the endpoint is dying
if err := e.LockAlive(); err != nil {
if err := e.lockAlive(); err != nil {
return nil
}
defer e.Unlock()
defer e.unlock()

// No need to finish IPVLAN initialization for Docker if the endpoint isn't
// running with Docker.
if e.DockerNetworkID == "" {
return nil
}

if e.isDatapathMapPinnedLocked() {
if e.isDatapathMapPinned {
// The datapath map is pinned which implies that the post-initialization
// for the ipvlan slave has been successfully performed
return nil
Expand All @@ -1188,7 +1188,7 @@ func (e *Endpoint) FinishIPVLANInit(netNsPath string) error {
unix.Close(mapFD)
}()

if err = e.setDatapathMapIDAndPinMapLocked(mapID); err != nil {
if err = e.setDatapathMapIDAndPinMap(mapID); err != nil {
return fmt.Errorf("Unable to pin datapath map: %s", err)
}

Expand Down