From 1fc6460fa16b157b0d333b96d6d93b7d273ed91a Mon Sep 17 00:00:00 2001 From: jibuji Date: Tue, 11 Oct 2022 00:02:45 +0800 Subject: [PATCH] fix: Log early abort. Fixes #9573 (#9575) Signed-off-by: pengfei.ji Co-authored-by: pengfei.ji --- util/logs/workflow-logger.go | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/util/logs/workflow-logger.go b/util/logs/workflow-logger.go index 2cc93acfe2f6..753f378082e2 100644 --- a/util/logs/workflow-logger.go +++ b/util/logs/workflow-logger.go @@ -41,6 +41,27 @@ type sender interface { Send(entry *workflowpkg.LogEntry) error } +const maxTokenLength = 1024 * 1024 +const startBufSize = 16 * 1024 + +func scanLinesOrGiveLong(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = bufio.ScanLines(data, atEOF) + if advance > 0 || token != nil || err != nil { + // bufio.ScanLines found something, use it + return + } + + // bufio.ScanLines found nothing + // if our buffer is still a reasonable size, continue scanning for regular lines + if len(data) < maxTokenLength { + return + } + + // our buffer is getting massive, stop waiting for line breaks and return data now + // this avoids bufio.ErrTooLong + return maxTokenLength, data[0:maxTokenLength], nil +} + func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient kubernetes.Interface, req request, sender sender) error { wfInterface := wfClient.ArgoprojV1alpha1().Workflows(req.GetNamespace()) _, err := wfInterface.Get(ctx, req.GetName(), metav1.GetOptions{}) @@ -111,7 +132,12 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient logCtx.Error(err) return } + scanner := bufio.NewScanner(stream) + //give it more space for long line + scanner.Buffer(make([]byte, startBufSize), maxTokenLength) + //avoid bufio.ErrTooLong error when encounters a very very long line + scanner.Split(scanLinesOrGiveLong) for scanner.Scan() { select { case <-ctx.Done(): @@ -119,7 +145,11 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient default: line := scanner.Text() parts := strings.SplitN(line, " ", 2) - content := parts[1] + //on old version k8s, the line may contains no space, hence len(parts) would equal to 1 + content := "" + if len(parts) > 1 { + content = parts[1] + } timestamp, err := time.Parse(time.RFC3339, parts[0]) if err != nil { logCtx.Errorf("unable to decode or infer timestamp from log line: %s", err)