Skip to content

Commit

Permalink
daemon/deletion-queue: Add support for batch delete
Browse files Browse the repository at this point in the history
This commit adds support for the batch delete API call in the endpoint
deletion queue. The endpoint deletion queue is used to enqueue endpoint
deletion requests in CNI DEL when cilium-agent is unavailable.

Support for batch deletion in the deletion queue is particularly
important, because if cilium-agent is unavailable, we cannot query it
for endpoints which belong to a particular container.

Before this commit, a endpoint deletion queue entry just contained a
string that represented a unique endpoint identifier. To support more
flexible batch deletion requests, we now instead serialize the full HTTP
API `EndpointDeleteRequest` as JSON. The new format is more flexible, as
it allows us to add more fields in the future. The old format (the
endpoint ID as a plain string) is still supported for
backwards-compatibility.

Co-authored-by: Tobias Klauser <tobias@isovalent.com>
Signed-off-by: Sebastian Wicki <sebastian@isovalent.com>
Signed-off-by: Tobias Klauser <tobias@isovalent.com>
  • Loading branch information
2 people authored and aditighag committed Aug 10, 2023
1 parent a0ba133 commit 7666581
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 36 deletions.
34 changes: 30 additions & 4 deletions daemon/cmd/deletion_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"

"github.com/cilium/cilium/api/v1/models"
"github.com/cilium/cilium/api/v1/server"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/hive"
Expand Down Expand Up @@ -94,12 +95,9 @@ func (dq *deletionQueue) processQueuedDeletes(d *Daemon, ctx context.Context) er
log.Infof("Processing %d queued deletion requests", len(files))

for _, file := range files {
// get the container id
epID, err := os.ReadFile(file)
err = d.processQueuedDeleteEntryLocked(file)
if err != nil {
log.WithError(err).WithField(logfields.Path, file).Error("Failed to read queued CNI deletion entry. Endpoint will not be deleted.")
} else {
_, _ = d.DeleteEndpoint(string(epID)) // this will log errors elsewhere
}

if err := os.Remove(file); err != nil {
Expand All @@ -124,3 +122,31 @@ func unlockAfterAPIServer(lc hive.Lifecycle, _ *server.Server, dq *deletionQueue
},
})
}

// processQueuedDeleteEntry processes the contents of the deletion queue entry
// in file. Requires the caller to hold the deletion queue lock.
func (d *Daemon) processQueuedDeleteEntryLocked(file string) error {
contents, err := os.ReadFile(file)
if err != nil {
return err
}

// Attempt to parse contents as a batch deletion request
var req models.EndpointBatchDeleteRequest
err = req.UnmarshalBinary(contents)
if err != nil {
// fall back on treating the file contents as an endpoint id (legacy behavior)
epID := string(contents)
log.
WithError(err).
WithField(logfields.EndpointID, epID).
Debug("Falling back on legacy deletion queue format")
_, _ = d.DeleteEndpoint(epID) // this will log errors elsewhere
return nil
}

// As with DeleteEndpoint, errors are logged elsewhere
_, _ = d.deleteEndpointByContainerID(req.ContainerID)

return nil
}
7 changes: 7 additions & 0 deletions pkg/client/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ func (c *Client) EndpointList() ([]*models.Endpoint, error) {
return resp.Payload, nil
}

// EndpointDeleteMany deletes multiple endpoints
func (c *Client) EndpointDeleteMany(req *models.EndpointBatchDeleteRequest) error {
params := endpoint.NewDeleteEndpointParams().WithEndpoint(req).WithTimeout(api.ClientTimeout)
_, _, err := c.Endpoint.DeleteEndpoint(params)
return Hint(err)
}

// EndpointGet returns endpoint by ID
func (c *Client) EndpointGet(id string) (*models.Endpoint, error) {
params := endpoint.NewGetEndpointIDParams().WithID(id).WithTimeout(api.ClientTimeout)
Expand Down
74 changes: 50 additions & 24 deletions plugins/cilium-cni/lib/deletion_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/sirupsen/logrus"

"github.com/cilium/cilium/api/v1/models"
"github.com/cilium/cilium/pkg/client"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/lock/lockfile"
Expand Down Expand Up @@ -117,36 +118,61 @@ func (dc *DeletionFallbackClient) EndpointDelete(id string) error {
// fall-back mode
if dc.lockfile != nil {
dc.logger.WithField(logfields.EndpointID, id).Info("Queueing deletion request for endpoint")
return dc.enqueueDeletionRequestLocked(id)
}

// sanity check: if there are too many queued deletes, just return error
// back up to the kubelet. If we get here, it's either because something
// has gone wrong with the kubelet, or the agent has been down for a very
// long time. To guard aganst long agent startup times (when it empties the
// queue), limit us to 256 queued deletions. If this does, indeed, overflow,
// then the kubelet will get the failure and eventually retry deletion.
files, err := os.ReadDir(defaults.DeleteQueueDir)
if err != nil {
dc.logger.WithField(logfields.Path, defaults.DeleteQueueDir).WithError(err).Error("failed to list deletion queue directory")
return err
}
if len(files) > maxDeletionFiles {
return fmt.Errorf("deletion queue directory %s has too many entries; aborting", defaults.DeleteQueueDir)
}
return fmt.Errorf("attempt to delete with no valid connection")
}

// hash endpoint id for a random filename
h := sha256.New()
h.Write([]byte(id))
filename := fmt.Sprintf("%x.delete", h.Sum(nil))
path := filepath.Join(defaults.DeleteQueueDir, filename)
// EndpointDeleteMany deletes multiple endpoints based on the endpoint deletion request,
// either by directly accessing the API or dropping in a queued-deletion file.
func (dc *DeletionFallbackClient) EndpointDeleteMany(req *models.EndpointBatchDeleteRequest) error {
if dc.cli != nil {
return dc.cli.EndpointDeleteMany(req)
}

err = os.WriteFile(path, []byte(id), 0644)
// fall-back mode
if dc.lockfile != nil {
dc.logger.WithField(logfields.Request, req).Info("Queueing endpoint batch deletion request")
b, err := req.MarshalBinary()
if err != nil {
dc.logger.WithField(logfields.Path, path).WithError(err).Error("failed to write deletion file")
return fmt.Errorf("failed to write deletion file %s: %w", path, err)
return fmt.Errorf("failed to marshal endpoint delete request: %w", err)
}
dc.logger.Info("wrote queued deletion file")
return nil
return dc.enqueueDeletionRequestLocked(string(b))
}

return fmt.Errorf("attempt to delete with no valid connection")
}

// enqueueDeletionRequestLocked enqueues the encoded endpoint deletion request into the
// endpoint deletion queue. Requires the caller to hold the deletion queue lock.
func (dc *DeletionFallbackClient) enqueueDeletionRequestLocked(contents string) error {
// sanity check: if there are too many queued deletes, just return error
// back up to the kubelet. If we get here, it's either because something
// has gone wrong with the kubelet, or the agent has been down for a very
// long time. To guard aganst long agent startup times (when it empties the
// queue), limit us to 256 queued deletions. If this does, indeed, overflow,
// then the kubelet will get the failure and eventually retry deletion.
files, err := os.ReadDir(defaults.DeleteQueueDir)
if err != nil {
dc.logger.WithField(logfields.Path, defaults.DeleteQueueDir).WithError(err).Error("failed to list deletion queue directory")
return err
}
if len(files) > maxDeletionFiles {
return fmt.Errorf("deletion queue directory %s has too many entries; aborting", defaults.DeleteQueueDir)
}

// hash endpoint id for a random filename
h := sha256.New()
h.Write([]byte(contents))
filename := fmt.Sprintf("%x.delete", h.Sum(nil))
path := filepath.Join(defaults.DeleteQueueDir, filename)

err = os.WriteFile(path, []byte(contents), 0644)
if err != nil {
dc.logger.WithField(logfields.Path, path).WithError(err).Error("failed to write deletion file")
return fmt.Errorf("failed to write deletion file %s: %w", path, err)
}
dc.logger.Info("wrote queued deletion file")
return nil
}
16 changes: 8 additions & 8 deletions plugins/cilium-cni/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,14 +663,14 @@ func cmdDel(args *skel.CmdArgs) error {
return err
}

id := endpointid.NewCNIAttachmentID(args.ContainerID, args.IfName)
if err := c.EndpointDelete(id); err != nil {
// EndpointDelete returns an error in the following scenarios:
// DeleteEndpointIDInvalid: Invalid delete parameters, no need to retry
// DeleteEndpointIDNotFound: No need to retry
// DeleteEndpointIDErrors: Errors encountered while deleting,
// the endpoint is always deleted though, no
// need to retry
req := &models.EndpointBatchDeleteRequest{ContainerID: args.ContainerID}
if err := c.EndpointDeleteMany(req); err != nil {
// EndpointDeleteMany returns an error in the following scenarios:
// DeleteEndpointInvalid: Invalid delete parameters, no need to retry
// DeleteEndpointNotFound: No need to retry
// DeleteEndpointErrors: Errors encountered while deleting,
// the endpoint is always deleted though, no
// need to retry
log.WithError(err).Warning("Errors encountered while deleting endpoint")
}

Expand Down

0 comments on commit 7666581

Please sign in to comment.