-
Notifications
You must be signed in to change notification settings - Fork 350
Conversation
/hold |
c7c71db
to
76cbdcb
Compare
/cc @Random-Liu PTAL |
@yanxuean: GitHub didn't allow me to request PR reviews from the following users: PTAL. Note that only containerd members and repo collaborators can review this PR, and authors cannot review their own PRs. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Ref #642. |
pkg/server/events.go
Outdated
@@ -43,6 +45,14 @@ type eventMonitor struct { | |||
errCh <-chan error | |||
ctx context.Context | |||
cancel context.CancelFunc | |||
backOffQueue backOffQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put all backoff related thing into one struct? It will also be easier to unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
pkg/server/events.go
Outdated
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", e) | ||
break | ||
} | ||
if cID, backOffIng := em.isBackOffIng(any); backOffIng { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/isBackOffIng/isInBackoff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
pkg/server/events.go
Outdated
if e.Pid != sb.Status.Get().Pid { | ||
// Non-init process died, ignore the event. | ||
return | ||
return nil | ||
} | ||
// No stream attached to sandbox container. | ||
task, err := sb.Container.Task(context.Background(), nil) | ||
if err != nil { | ||
if !errdefs.IsNotFound(err) { | ||
logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log in handleEvent
instead, since you've returned the error anyway.
return fmt.Errorf("failed to load task: %v", err)
And same for the following errors and also handleContainerExit
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
pkg/server/events.go
Outdated
em.backOffQueue[key] = queue | ||
} | ||
|
||
func (em *eventMonitor) newBackOffTimer(key string) *time.Timer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no backoff? The interval should increase if it keeps failing, right?
pkg/server/events.go
Outdated
em.enBackOff(cID, any) | ||
break | ||
} | ||
if cID, err := em.handleEvent(any); err != nil && cID != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why err != nil && cID != ""
?
Is it possible that err != nil
but cID == ""
? Is that an error?
Actually, you've got the container id here, why still let handleEvent
return one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To prevent accidents:)
will do
pkg/server/events.go
Outdated
case err := <-em.errCh: | ||
logrus.WithError(err).Error("Failed to handle event stream") | ||
close(closeCh) | ||
return | ||
case cID := <-em.backOffExpire: | ||
for { | ||
any := em.deBackOff(cID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will any == nil
? Is that an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, It is not error. It is in "for" loop. When we handle over the whole queue, it will be nil.
pkg/server/events.go
Outdated
if any == nil { | ||
break | ||
} | ||
if _, err := em.handleEvent(any); err != nil && cID != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cID != ""
? You don't even set it based on the return value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
pkg/server/events.go
Outdated
// enBackOff start to backOff again and put event to the begin of queue | ||
func (em *eventMonitor) reBackOff(key string, evt interface{}) { | ||
newEvents := []interface{}{evt} | ||
queue := em.backOffQueue[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just requeue, and always stop timer as long as it is not nil
.
No need to check length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If then, We will stop timer twice when there is only one event.
@yanxuean I feel like the per container timer logic is unnecessarily complex. And we haven't done real back yet. (See #628 (comment)) I feel like it is very easy to forget starting a timer, or leave an unnecessary timer over. How about we just have a ticker which ticks every 1 second or so? And every time it ticks, we check whether there is any events which have exceed backoff time. And deal with them if there are any. This seems more reliable to me. To optimize, we can only start the ticker when there is one event enqueued. |
Will refactor it. |
cb75ccc
to
0d93897
Compare
624dad3
to
5e2e5cb
Compare
@Random-Liu PTAL, Tks |
pkg/server/events.go
Outdated
@@ -74,16 +95,40 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { | |||
return nil, errors.New("event channel is nil") | |||
} | |||
closeCh := make(chan struct{}) | |||
em.backOff.ticker = time.NewTicker(1 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add a start function in backoff
?
pkg/server/events.go
Outdated
@@ -54,6 +72,9 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit | |||
sandboxStore: s, | |||
ctx: ctx, | |||
cancel: cancel, | |||
backOff: backOff{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add a new
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
pkg/server/events.go
Outdated
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", e) | ||
break | ||
} | ||
cID, backOffIng := em.backOff.isInBackOff(any) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/backOffIng/inBackoff
pkg/server/events.go
Outdated
break | ||
} | ||
if err := em.handleEvent(any); err != nil { | ||
em.backOff.enBackOff(cID, any) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log the error
pkg/server/events.go
Outdated
case err := <-em.errCh: | ||
logrus.WithError(err).Error("Failed to handle event stream") | ||
close(closeCh) | ||
return | ||
case <-em.backOff.ticker.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add a simple function to return the channel?
pkg/server/events.go
Outdated
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt) | ||
return | ||
} | ||
func (em *eventMonitor) handleEvent(any interface{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's return error with description from this function, and we only need to log the error in start
.
In this way, we don't need so many logs in this function.
pkg/server/events.go
Outdated
} | ||
|
||
// enBackOff start to backOff and put event to the tail of queue | ||
func (b *backOff) enBackOff(key string, evt interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queue, ok := b.queues[key]
if ok {
queue.events = append(queue.events, evt)
return
}
b.queues[key] = backOffQueue{
events: []interface{}{evt},
duration: initialBackoffTime,
start: time.Now(),
} // Or add a function `newBackoffQueue`
pkg/server/events.go
Outdated
} | ||
|
||
// enBackOff start to backOff again and put [nth:] events to the queue | ||
func (b *backOff) reBackOff(key string, queue backOffQueue, n int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
duration := 2 * oldDuration
if duration > maxBackoffTime {
duration = maxBackoffTime
}
b.queues[key] = backOffQueue{
events: events,
duration: duration,
start: time.Now(),
}
}
pkg/server/events.go
Outdated
return !now.Before(t.expireTime) | ||
} | ||
|
||
func (t backOffTime) newTime() backOffTime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function is unnecessary complex... Simple and straightforward code is preferred. See comments above.
pkg/server/events.go
Outdated
var containers []string | ||
now := time.Now() | ||
for c, v := range b.queues { | ||
if v.time.isExpire(now) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if time.Since(v.start) > v.duration // or add a `isExpired` function for backOffQueue
pkg/server/events_test.go
Outdated
}, | ||
} | ||
|
||
t.Logf("Should can backOff a event") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/can/be able to
And also for following ones.
pkg/server/events_test.go
Outdated
} | ||
|
||
t.Logf("Should can backOff a event") | ||
actual := newEventMonitor(nil, nil).backOff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
? I prefer using the new
function suggested in https://github.com/containerd/cri/pull/628/files#r174375426.
pkg/server/events_test.go
Outdated
} | ||
assert.Equal(t, isQueueListEqual(t, actual.queues, expectedQueues, 1*time.Second), true) | ||
|
||
t.Logf("Should can check if the container is on backOff state") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/on/in
@@ -0,0 +1,150 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a fake clock for the unit test.
https://github.com/containerd/cri/tree/master/vendor/k8s.io/apimachinery/pkg/util/clock
pkg/server/events_test.go
Outdated
@@ -0,0 +1,150 @@ | |||
/* | |||
Copyright 2018 The Kubernetes Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
containerd Authors
pkg/server/events_test.go
Outdated
assert.Equal(t, isQueueListEqual(t, actual.queues, expectedQueues, 1*time.Second), true) | ||
|
||
t.Logf("Should can check if the container is on backOff state") | ||
for k, queue := range inputQueues { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should check an arbitrary container id is not in backoff.
pkg/server/events_test.go
Outdated
} | ||
} | ||
|
||
t.Logf("Should can get all keys who are expired for backOff") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/who/which
pkg/server/events_test.go
Outdated
actKeyList := actual.getExpiredContainers() | ||
assert.Equal(t, len(expKeyList), len(actKeyList)) | ||
for _, expKey := range expKeyList { | ||
found := false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert.Contains
pkg/server/events_test.go
Outdated
assert.Equal(t, isQueueEqual(t, actQueue, expectedQueues[k], 1*time.Second), true) | ||
} | ||
|
||
t.Logf("Should not get out the event again after having gut out the backOff event") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/gut/got.
pkg/server/events_test.go
Outdated
expQueue := backOffQueue{ | ||
events: queue.events[failEventIndex:], | ||
} | ||
assert.Equal(t, isQueueEqual(t, actQueue, expQueue, 2*time.Second), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use constant for code and test.
Signed-off-by: yanxuean <yan.xuean@zte.com.cn>
fix containerd#434 Signed-off-by: yanxuean <yan.xuean@zte.com.cn>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with nits. I can take care of the nits in another PR.
@@ -55,6 +82,8 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit | |||
sandboxStore: s, | |||
ctx: ctx, | |||
cancel: cancel, | |||
backOff: newBackOff(backOffInitDuration, backOffMaxDuration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just use the const instead of parameterize it.
logrus.WithError(err).Errorf("Failed to convert event %+v", e) | ||
break | ||
} | ||
if em.backOff.isInBackOff(cID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an info log here.
t.Logf("Should be able to check that a container isn't in backOff state") | ||
notExistKey := "containerNotExist" | ||
assert.Equal(t, actual.isInBackOff(notExistKey), false) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add a test case that getExpiredContainers
should be empty when not expired.
for _, k := range actKeyList { | ||
actKeyMap[k] = struct{}{} | ||
} | ||
assert.Equal(t, actKeyMap, expKeyMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can compare length, and use assert.Contains
.
@yanxuean Thanks a lot! Good job! :D |
fix #434
Signed-off-by: yanxuean yan.xuean@zte.com.cn