Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions integration/container_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,66 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)

func TestContainerLogWithoutTailingNewLine(t *testing.T) {
testPodLogDir, err := ioutil.TempDir("/tmp", "container-log-without-tailing-newline")
require.NoError(t, err)
defer os.RemoveAll(testPodLogDir)

t.Log("Create a sandbox with log directory")
sbConfig := PodSandboxConfig("sandbox", "container-log-without-tailing-newline",
WithPodLogDirectory(testPodLogDir),
)
sb, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.StopPodSandbox(sb))
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit these constants are defined twice in this source file..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test image is per test, they just happen to have the same value. :p

I can send a following up PR to add a global busyboxImage variable, and use it in all integration tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, on a second thought, I think the image and test container name is per-test, let's just keep it per-test constants. :P

testImage = "busybox"
containerName = "test-container"
)
t.Logf("Pull test image %q", testImage)
img, err := imageService.PullImage(&runtime.ImageSpec{Image: testImage}, nil)
require.NoError(t, err)
defer func() {
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img}))
}()

t.Log("Create a container with log path")
cnConfig := ContainerConfig(
containerName,
testImage,
WithCommand("sh", "-c", "printf abcd"),
WithLogPath(containerName),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)

t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))

t.Log("Wait for container to finish running")
require.NoError(t, Eventually(func() (bool, error) {
s, err := runtimeService.ContainerStatus(cn)
if err != nil {
return false, err
}
if s.GetState() == runtime.ContainerState_CONTAINER_EXITED {
return true, nil
}
return false, nil
}, time.Second, 30*time.Second))

t.Log("Check container log")
content, err := ioutil.ReadFile(filepath.Join(testPodLogDir, containerName))
assert.NoError(t, err)
checkContainerLog(t, string(content), []string{
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagPartial, "abcd"),
})
}

func TestLongContainerLog(t *testing.T) {
testPodLogDir, err := ioutil.TempDir("/tmp", "long-container-log")
require.NoError(t, err)
Expand Down Expand Up @@ -66,9 +126,9 @@ func TestLongContainerLog(t *testing.T) {
longLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize+1, "c")
cnConfig := ContainerConfig(
containerName,
"busybox",
testImage,
WithCommand("sh", "-c",
fmt.Sprintf("%s; echo; %s; echo; %s", shortLineCmd, maxLenLineCmd, longLineCmd)),
fmt.Sprintf("%s; echo; %s; echo; %s; echo", shortLineCmd, maxLenLineCmd, longLineCmd)),
Copy link
Member

@mikebrow mikebrow Jan 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a test to check processing of the carriage return processing '\r'.. but we can do that later.. need to get ci back up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is for Windows, if I remember correctly.

WithLogPath(containerName),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
Expand Down
19 changes: 7 additions & 12 deletions pkg/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
return "", nil, errors.Wrap(err, "failed to unmarshalany")
}

switch evt.(type) {
switch e := evt.(type) {
case *eventtypes.TaskExit:
id = evt.(*eventtypes.TaskExit).ContainerID
id = e.ContainerID
case *eventtypes.TaskOOM:
id = evt.(*eventtypes.TaskOOM).ContainerID
id = e.ContainerID
case *eventtypes.ImageCreate:
id = evt.(*eventtypes.ImageCreate).Name
id = e.Name
case *eventtypes.ImageUpdate:
id = evt.(*eventtypes.ImageUpdate).Name
id = e.Name
case *eventtypes.ImageDelete:
id = evt.(*eventtypes.ImageDelete).Name
id = e.Name
default:
return "", nil, errors.New("unsupported event")
}
Expand Down Expand Up @@ -200,9 +200,8 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
ctx, cancel := context.WithTimeout(ctx, handleEventTimeout)
defer cancel()

switch any.(type) {
switch e := any.(type) {
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := em.c.containerStore.Get(e.ID)
Expand All @@ -226,7 +225,6 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
return nil
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
// For TaskOOM, we only care which container it belongs to.
cntr, err := em.c.containerStore.Get(e.ContainerID)
Expand All @@ -244,15 +242,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
return errors.Wrap(err, "failed to update container status for TaskOOM event")
}
case *eventtypes.ImageCreate:
e := any.(*eventtypes.ImageCreate)
logrus.Infof("ImageCreate event %+v", e)
return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageUpdate:
e := any.(*eventtypes.ImageUpdate)
logrus.Infof("ImageUpdate event %+v", e)
return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageDelete:
e := any.(*eventtypes.ImageDelete)
logrus.Infof("ImageDelete event %+v", e)
return em.c.updateImage(ctx, e.Name)
}
Expand Down
78 changes: 69 additions & 9 deletions pkg/server/io/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"time"
Expand Down Expand Up @@ -61,6 +62,56 @@ func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.W
return pwc, stop
}

// bufio.ReadLine in golang eats both read errors and tailing newlines
// (See https://golang.org/pkg/bufio/#Reader.ReadLine). When reading
// to io.EOF, it is impossible for the caller to figure out whether
// there is a newline at the end, for example:
// 1) When reading "CONTENT\n", it returns "CONTENT" without error;
// 2) When reading "CONTENT", it also returns "CONTENT" without error.
//
// To differentiate these 2 cases, we need to write a readLine function
// ourselves to not ignore the error.
//
// The code is similar with https://golang.org/src/bufio/bufio.go?s=9537:9604#L359.
// The only difference is that it returns all errors from `ReadSlice`.
//
// readLine returns err != nil if and only if line does not end with a new line.
func readLine(b *bufio.Reader) (line []byte, isPrefix bool, err error) {
line, err = b.ReadSlice('\n')
if err == bufio.ErrBufferFull {
// Handle the case where "\r\n" straddles the buffer.
if len(line) > 0 && line[len(line)-1] == '\r' {
// Unread the last '\r'
if err := b.UnreadByte(); err != nil {
panic(fmt.Sprintf("invalid unread %v", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic seems harsh.. but I see that's the pattern. Still seems harsh.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a panic in golang source code as well.

}
line = line[:len(line)-1]
}
return line, true, nil
}

if len(line) == 0 {
if err != nil {
line = nil
}
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isPrefix isn't set in the function so by default it's false .. but still feels odd to return it "naked" without setting

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is copied from golang source code. Want to make minimum change so that it is easy to compare with the golang source code to find out what they've changed. :)

}

if line[len(line)-1] == '\n' {
// "ReadSlice returns err != nil if and only if line does not end in delim"
// (See https://golang.org/pkg/bufio/#Reader.ReadSlice).
if err != nil {
panic(fmt.Sprintf("full read with unexpected error %v", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Member Author

@Random-Liu Random-Liu Jan 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not putting a panic, I would just leave a comment there, so I think a panic is better.

}
drop := 1
if len(line) > 1 && line[len(line)-2] == '\r' {
drop = 2
}
line = line[:len(line)-drop]
}
return
}

func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) {
defer rc.Close()
var (
Expand Down Expand Up @@ -88,7 +139,16 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
}
for {
var stop bool
newLine, isPrefix, err := r.ReadLine()
newLine, isPrefix, err := readLine(r)
// NOTE(random-liu): readLine can return actual content even if there is an error.
if len(newLine) > 0 {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if err != nil {
if err == io.EOF {
logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path)
Expand All @@ -101,13 +161,6 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
}
// Stop after writing the content left in buffer.
stop = true
} else {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if maxLen > 0 && length > maxLen {
exceedLen := length - maxLen
Expand All @@ -125,7 +178,14 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
if isPrefix {
continue
}
writeLine(full, bytes.Join(buf, nil))
if stop {
// readLine only returns error when the message doesn't
// end with a newline, in that case it should be treated
// as a partial line.
writeLine(partial, bytes.Join(buf, nil))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't the isPrefix on lines 178-179 continue/skip over this partial write?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then below the stop breaks out.. but I think you want it to continue now?

Copy link
Member Author

@Random-Liu Random-Liu Jan 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If readLine hits io.EOF before newline, it is still considered a "Partial" line, but isPrefix=true in that case.

On kubelet side, it will always add a newline for Full log line, and not add newline for Partial log. We want the later behavior in this case.

} else {
writeLine(full, bytes.Join(buf, nil))
}
buf = nil
length = 0
if stop {
Expand Down
15 changes: 14 additions & 1 deletion pkg/server/io/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestRedirectLogs(t *testing.T) {
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
runtime.LogTagPartial,
},
content: []string{
"test stderr log 1",
Expand Down Expand Up @@ -222,6 +222,19 @@ func TestRedirectLogs(t *testing.T) {
strings.Repeat("a", defaultBufSize*10+20),
},
},
"log length longer than buffer size with tailing \\r\\n": {
input: strings.Repeat("a", defaultBufSize-1) + "\r\n" + strings.Repeat("a", defaultBufSize-1) + "\r\n",
stream: Stdout,
maxLen: -1,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize-1),
strings.Repeat("a", defaultBufSize-1),
},
},
} {
t.Logf("TestCase %q", desc)
rc := ioutil.NopCloser(strings.NewReader(test.input))
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import "strings"
// Comparison is case insensitive.
func InStringSlice(ss []string, str string) bool {
for _, s := range ss {
if strings.ToLower(s) == strings.ToLower(str) {
if strings.EqualFold(s, str) {
return true
}
}
Expand All @@ -34,7 +34,7 @@ func InStringSlice(ss []string, str string) bool {
func SubtractStringSlice(ss []string, str string) []string {
var res []string
for _, s := range ss {
if strings.ToLower(s) == strings.ToLower(str) {
if strings.EqualFold(s, str) {
continue
}
res = append(res, s)
Expand Down