Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add internal queue for kubernetes watcher updates #2123

Merged
merged 2 commits into from
Nov 24, 2017

Conversation

aanm
Copy link
Member

@aanm aanm commented Nov 23, 2017

Summary of changes:

Created a function serializer that will execute the functions the same order as they were received. This will allow us to retry those same functions in case of a failure.

Fixes: #1966

Add internal queue for k8s watcher updates #1966

@aanm aanm added sig/k8s Impacts the kubernetes API, or kubernetes -> cilium internals translation layers. kind/bug This is a bug in the Cilium logic. kind/enhancement This would improve or streamline existing functionality. pending-review release-note/bug This PR fixes an issue in a previous release of Cilium. labels Nov 23, 2017
@aanm aanm requested a review from a team November 23, 2017 10:58
@aanm aanm requested a review from a team as a code owner November 23, 2017 10:58

// NewFuncSerializer returns a funcSerializer that will be used to execute
// functions in the same order they are enqueued.
func NewFuncSerializer(size uint) *funcSerializer {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported func NewFuncSerializer returns unexported type *serializer.funcSerializer, which can be annoying to use


// NewFuncSerializer returns a funcSerializer that will be used to execute
// functions in the same order they are enqueued.
func NewFuncSerializer(size uint) *funcSerializer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/size/queueSize/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// NewFuncSerializer returns a funcSerializer that will be used to execute
// functions in the same order they are enqueued.
func NewFuncSerializer(size uint) *funcSerializer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call the whole thing FunctionQueue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// The waitFunc will be executed with the number of times `f` had returned an
// error, in case `f` returns an error. waitFunc should return either true or
// false depending if `f` should be executed again or not.
func (fs *funcSerializer) Enqueue(f func() error, waitFunc func(int) bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define a documented type for waitFunc which explains what int will be and what it has to return

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// amount of functions queued, this function can block until the function
// serializer is ready to receive more requests. In case `f` returns an error,
// `f` won't be executed again.
func (fs *funcSerializer) EnqueueOrDie(f func() error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed, just expose the NoRetry function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// Run starts the funcSerializer internal worker. It will be stopped once
// `stopCh` is closed or receives a value.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very misleading, stopped indicates that this is a go routine which it is not. Why not just spawn the routine in NewFuncSerializer? I would also store the stop channel in the funcSerializer struct and provide a Stop() API so the channel is hidden.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

noRetry = func(i int) bool { return false }
)

type funcProp struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this queuedFunction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

type funcSerializer struct {
funcs chan funcProp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this queue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

default:
}
retries++
if err := f.f(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to log these errors, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to do it here since this might be used on different places of the code. When calling the Enqueue function it should be done as follow:

serNodes.Enqueue(func() error {
    err := d.updateK8sNodeV1(oldK8sNode, newK8sNode)
    if err != nil {
         log.Debugf("this is a dedicated k8s function that failed")
    }
    return err
}, serializer.NoRetry)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that's fine, but then document that the error will only be used to determine failure if non-nil and that it is the responsibility of the function to log the error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@aanm aanm requested a review from tgraf November 23, 2017 13:54

// NewFunctionQueue returns a functionQueue that will be used to execute
// functions in the same order they are enqueued.
func NewFunctionQueue(queueSize uint) *functionQueue {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported func NewFunctionQueue returns unexported type *serializer.functionQueue, which can be annoying to use


// NewFunctionQueue returns a functionQueue that will be used to execute
// functions in the same order they are enqueued.
func NewFunctionQueue(queueSize uint) *functionQueue {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported func NewFunctionQueue returns unexported type *serializer.functionQueue, which can be annoying to use


// NewFunctionQueue returns a functionQueue that will be used to execute
// functions in the same order they are enqueued.
func NewFunctionQueue(queueSize uint) *functionQueue {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported func NewFunctionQueue returns unexported type *serializer.functionQueue, which can be annoying to use

package serializer

var (
// NoRetry always returns false independently the number of retries.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

independently of the number

NoRetry = func(int) bool { return false }
)

// WaitFunc will be executed with the number of times `f` had returned an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f is undocumented so I would rewrite this as follows:

WaitFunc will be invoked each time a queued function has returned an error. nRetries will be set to the number of consecutive execution failures that have occurred so far. The WaitFunc must return true if execution must be retried or false if the function must be returned from the queue.

Add function serializer that allows functions to be executed in order
with the ability to retry the execution in case of failure.

Signed-off-by: André Martins <andre@cilium.io>
Signed-off-by: André Martins <andre@cilium.io>
@aanm aanm requested a review from tgraf November 24, 2017 11:21
@aanm aanm added ready-to-merge This PR has passed all tests and received consensus from code owners to merge. and removed pending-review labels Nov 24, 2017
@tgraf tgraf merged commit a631ce2 into master Nov 24, 2017
@tgraf tgraf deleted the 1966-add-k8s-watcher-queue branch November 24, 2017 12:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug This is a bug in the Cilium logic. kind/enhancement This would improve or streamline existing functionality. ready-to-merge This PR has passed all tests and received consensus from code owners to merge. release-note/bug This PR fixes an issue in a previous release of Cilium. sig/k8s Impacts the kubernetes API, or kubernetes -> cilium internals translation layers.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants