Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust delta upload timeout #227

Merged
merged 5 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
kubernetesDeltaEnabled = pflag.Bool("kubernetes-delta-enabled", true, "Enable kubernetes delta sync")
kubernetesDeltaReportInterval = pflag.Duration("kubernetes-delta-interval", 15*time.Second, "Interval to report kubernetes object changes to cast backend (default `15s`, set to `0s` to disable)")
initialKubernetesDeltaReportDelay = pflag.Duration("kubernetes-delta-init-delay", 60*time.Second, "Initial delay to wait before starting reporting first kubernetes object deltas (first send report is full snapshot, this might take some time for large clusters. default: `1m`)")
kubernetesDeltaSendTimeout = pflag.Duration("kubernetes-delta-send-timeout", 10*time.Second, "Kubernetes deltas send timeout")
kubernetesDeltaSendTimeout = pflag.Duration("kubernetes-delta-send-timeout", 3*time.Minute, "Kubernetes deltas send timeout")

imageScanEnabled = pflag.Bool("image-scan-enabled", false, "Enable image scanning")
imageScanInterval = pflag.Duration("image-scan-interval", 30*time.Second, "Image scan scheduling interval")
Expand All @@ -71,6 +71,7 @@ var (

kubeLinterEnabled = pflag.Bool("kube-linter-enabled", false, "Kube linter enabled")
kubeLinterScanInterval = pflag.Duration("kube-linter-scan-interval", 60*time.Second, "Kube linter scan interval")
kubeLinterInitDelay = pflag.Duration("kube-linter-init-delay", 60*time.Second, "Kube linter init delay")

jobsCleanupInterval = pflag.Duration("jobs-cleanup", 10*time.Minute, "Jobs cleanup interval")
jobsCleanupJobAge = pflag.Duration("jobs-cleanup-job-age", 10*time.Minute, "Jobs cleanup job age")
Expand Down Expand Up @@ -140,6 +141,7 @@ func main() {
Linter: kubelinter.Config{
Enabled: *kubeLinterEnabled,
ScanInterval: *kubeLinterScanInterval,
InitDelay: *kubeLinterInitDelay,
},
KubeBench: kubebench.Config{
Enabled: *kubeBenchEnabled,
Expand Down
22 changes: 12 additions & 10 deletions cmd/controller/state/delta/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ func (c *Controller) Run(ctx context.Context) error {
t := time.NewTicker(c.cfg.Interval)
defer t.Stop()

firstDeltaReport := true

firstDeltaReport := false
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
c.sendDeltas(ctx, firstDeltaReport)
if err := c.sendDeltas(ctx, firstDeltaReport); err != nil {
c.log.Errorf("sending deltas: %v", err)
}
if firstDeltaReport {
firstDeltaReport = false
}
Expand Down Expand Up @@ -135,10 +136,10 @@ func (c *Controller) OnDelete(obj kube.Object) {
c.recordDeltaEvent(castaipb.KubernetesDeltaItemEvent_DELTA_REMOVE, obj)
}

func (c *Controller) sendDeltas(ctx context.Context, firstDeltaReport bool) {
func (c *Controller) sendDeltas(ctx context.Context, firstDeltaReport bool) error {
pendingDeltas := c.popPendingItems()
if len(pendingDeltas) == 0 {
return
return nil
}
start := time.Now()

Expand All @@ -153,28 +154,29 @@ func (c *Controller) sendDeltas(ctx context.Context, firstDeltaReport bool) {
if firstDeltaReport {
meta = append(meta, "x-delta-full-snapshot", "true")
}

ctx = metadata.AppendToOutgoingContext(ctx, meta...)
deltaStream, err := c.castaiClient.KubernetesDeltaIngest(ctx, grpc.UseCompressor(gzip.Name))
if err != nil && !errors.Is(err, context.Canceled) {
c.log.Warnf("creating delta upload stream: %v", err)
return
return err
}
defer func() {
_ = deltaStream.CloseSend()
}()

var sentDeltasCount int
for _, item := range pendingDeltas {
item := item
anjmao marked this conversation as resolved.
Show resolved Hide resolved
pbItem := c.toCastaiDelta(item)
if err := c.sendDeltaItem(ctx, deltaStream, pbItem); err != nil {
c.log.Warnf("sending delta item: %v", err)
// Return any remaining items back to pending list.
c.upsertPendingItems(pendingDeltas[sentDeltasCount:])
return
return err
}
sentDeltasCount++
}
c.log.Infof("sent deltas, id=%v, count=%d/%d, duration=%v", deltaID, len(pendingDeltas), sentDeltasCount, time.Since(start))
return nil
}

func (c *Controller) sendDeltaItem(ctx context.Context, stream castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient, item *castaipb.KubernetesDeltaItem) error {
Expand All @@ -198,7 +200,7 @@ func (c *Controller) sendDeltaItem(ctx context.Context, stream castaipb.RuntimeS
), ctx,
), func(err error, duration time.Duration) {
if err != nil {
c.log.Warnf("sending delta item: %v", err)
c.log.Warnf("sending delta item, duration=%v: %v", duration, err)
}
})
}
Expand Down
46 changes: 43 additions & 3 deletions cmd/controller/state/delta/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package delta
import (
"context"
"errors"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -184,14 +185,53 @@ func TestController(t *testing.T) {

ctrl.OnAdd(dep1)
r.Len(ctrl.pendingItems, 1)
ctrl.sendDeltas(ctx, false)
r.NoError(ctrl.sendDeltas(ctx, false))
r.Len(ctrl.pendingItems, 0)

ctrl.OnAdd(dep1)
r.Len(ctrl.pendingItems, 1)
ctrl.sendDeltas(ctx, false)
r.ErrorIs(ctrl.sendDeltas(ctx, false), context.DeadlineExceeded)
r.Len(ctrl.pendingItems, 1)
})

t.Run("send multiple delta items", func(t *testing.T) {
dep1 := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: "dep1",
Namespace: "default",
UID: types.UID("111b56a9-ab5e-4a35-93af-f092e2f63011"),
Labels: map[string]string{"l1": "v1"},
},
}
dep2 := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: "dep2",
Namespace: "default",
UID: types.UID("111b56a9-ab5e-4a35-93af-f092e2f63012"),
Labels: map[string]string{"l2": "v2"},
},
}

r := require.New(t)
client := newMockClient()
ctrl := newTestController(log, client)
ctrl.castaiClient = client
ctrl.OnAdd(dep1)
ctrl.OnAdd(dep2)

err := ctrl.sendDeltas(ctx, false)
r.NoError(err)
r.Len(client.deltas, 2)
sort.Slice(client.deltas, func(i, j int) bool {
return client.deltas[i].ObjectName < client.deltas[j].ObjectName
})
r.Equal(dep1.Labels, client.deltas[0].ObjectLabels)
r.Equal(dep1.ObjectMeta.Name, client.deltas[0].ObjectName)
r.Equal(dep2.Labels, client.deltas[1].ObjectLabels)
r.Equal(dep2.ObjectMeta.Name, client.deltas[1].ObjectName)
})
}

func newMockClient() *mockCastaiClient {
Expand All @@ -212,7 +252,7 @@ func newMockClient() *mockCastaiClient {
func newTestController(log *logging.Logger, client *mockCastaiClient) *Controller {
return NewController(
log,
Config{Interval: 1 * time.Millisecond, InitialDeltay: 1 * time.Millisecond},
Config{Interval: 1 * time.Millisecond, InitialDeltay: 1 * time.Millisecond, SendTimeout: 10 * time.Millisecond},
client,
&mockPodOwnerGetter{},
)
Expand Down
7 changes: 7 additions & 0 deletions cmd/controller/state/kubelinter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type castaiClient interface {
type Config struct {
Enabled bool
ScanInterval time.Duration `validate:"required"`
InitDelay time.Duration
}

func NewController(log *logging.Logger, cfg Config, linter *Linter, castaiClient castaiClient) *Controller {
Expand Down Expand Up @@ -73,6 +74,12 @@ func (c *Controller) Run(ctx context.Context) error {
c.log.Info("running")
defer c.log.Infof("stopping")

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.cfg.InitDelay):
}

for {
select {
case <-ctx.Done():
Expand Down
1 change: 1 addition & 0 deletions e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func installChart(ns, imageTag string) ([]byte, error) {
--set controller.extraArgs.kube-bench-cloud-provider=gke \
--set controller.extraArgs.kube-linter-enabled=true \
--set controller.extraArgs.kube-linter-scan-interval=5s \
--set controller.extraArgs.kube-linter-init-delay=5s \
--set castai.grpcAddr=%s \
--set castai.apiKey=%s \
--set castai.clusterID=%s \
Expand Down
2 changes: 2 additions & 0 deletions e2e/oom-generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: oom-generator
labels:
app: oom-generator
spec:
replicas: 1
selector:
Expand Down
Loading