Skip to content

Commit

Permalink
resource: Refactor the API for simplified usage
Browse files Browse the repository at this point in the history
As Resource[T] was used more it became apparent that making it an
stream.Observable and having Event[T] be an interface makes it inconvenient
to use:

- Almost always we're subscribing to it for the whole duration of the
  application and do not need to handle completion errors. Consuming the events
  via a channel is also often the preferred way as it always "for-select"
  over multiple resources.

- Event[T] contains almost the same payload for all event types, we might
  as well keep it simple and just use a single struct.

This changes Observe() into `Events(ctx, opts) <-chan Event[T]` and moves
rate limiting and error handling to be per-subscriber options. Event[T]
becomes a single struct with a kind tag to specific event type.

In order to handle large datasets at startup the events are now emitted
immediately rather than waiting for informer to synchronize the store
first.

The BenchmarkResource in resource_test.go was also ported to current master
and the difference between the unbuffered channel approach to callback was
about 2x (2600ns/op for channels, 1350ns/op for callbacks), which is negligble
enough. Benchmark was also ran with buffered event channel with no difference
to unbuffered.  Unbuffered is preferable as allows better chances for coalescing
away the intermediate updates for objects with the same key.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki committed Dec 22, 2022
1 parent de7c96f commit f3a9b0a
Show file tree
Hide file tree
Showing 14 changed files with 632 additions and 791 deletions.
1 change: 0 additions & 1 deletion clustermesh-apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func newSvcResource(lc hive.Lifecycle, c k8sClient.Clientset) (resource.Resource
utils.ListerWatcherWithModifier(
utils.ListerWatcherFromTyped[*slim_corev1.ServiceList](c.Slim().CoreV1().Services("")),
optsModifier),
resource.WithErrorHandler(resource.AlwaysRetry),
), nil
}

Expand Down
14 changes: 0 additions & 14 deletions operator/cmd/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
package cmd

import (
"time"

"k8s.io/client-go/util/workqueue"

"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
cilium_api_v2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
Expand All @@ -32,16 +28,12 @@ var resourcesCell = cell.Module(
k8sutils.ListerWatcherWithModifier(
k8sutils.ListerWatcherFromTyped[*slim_core_v1.ServiceList](c.Slim().CoreV1().Services("")),
optsModifier),
resource.WithErrorHandler(resource.AlwaysRetry),
resource.WithRateLimiter(errorRateLimiter),
), nil
},
func(lc hive.Lifecycle, c k8sClient.Clientset) resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool] {
return resource.New[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool](
lc,
k8sutils.ListerWatcherFromTyped[*cilium_api_v2alpha1.CiliumLoadBalancerIPPoolList](c.CiliumV2alpha1().CiliumLoadBalancerIPPools()),
resource.WithErrorHandler(resource.AlwaysRetry),
resource.WithRateLimiter(errorRateLimiter),
)
},
),
Expand All @@ -53,9 +45,3 @@ type SharedResources struct {
Services resource.Resource[*slim_core_v1.Service]
CLBIPPools resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool]
}

func errorRateLimiter() workqueue.RateLimiter {
// This rate limiter will retry in the following pattern
// 250ms, 500ms, 1s, 2s, 4s, 8s, 16s, 32s, .... max 5m
return workqueue.NewItemExponentialFailureRateLimiter(250*time.Millisecond, 5*time.Minute)
}

0 comments on commit f3a9b0a

Please sign in to comment.