Skip to content

Commit

Permalink
Merge pull request #3945 from fluxcd/lenient-logs-cmd
Browse files Browse the repository at this point in the history
Make `flux logs` more lenient
  • Loading branch information
makkes committed Jun 5, 2023
2 parents f01cf5e + cbdd71e commit a3f2b1d
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 73 deletions.
49 changes: 39 additions & 10 deletions cmd/flux/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -76,7 +77,7 @@ type logsFlags struct {
sinceSeconds time.Duration
}

var logsArgs = &logsFlags{
var logsArgs = logsFlags{
tail: -1,
}

Expand Down Expand Up @@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no argument required")
}

pods, err := getPods(ctx, clientset, fluxSelector)
pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return podLogs(ctx, requests)
}

func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) {
// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods
// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running
// state). If no Pod is found, an error is returned.
func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) {
var ret []corev1.Pod

opts := metav1.ListOptions{
LabelSelector: label,
}
deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts)
deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
opts := metav1.ListOptions{
LabelSelector: createLabelStringFromMap(label),
}
podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts)
podList, err := c.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -196,19 +200,24 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
}
}

if len(ret) == 0 {
return nil, fmt.Errorf("no Flux pods found in namespace %q", ns)
}

return ret, nil
}

func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
reader, writer := io.Pipe()
errReader, errWriter := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(req rest.ResponseWrapper) {
defer wg.Done()
if err := logRequest(ctx, req, writer); err != nil {
writer.CloseWithError(err)
fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
return
}
}(request)
Expand All @@ -217,20 +226,40 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
go func() {
wg.Wait()
writer.Close()
errWriter.Close()
}()

_, err := io.Copy(os.Stdout, reader)
return err
stdoutErrCh := asyncCopy(os.Stdout, reader)
stderrErrCh := asyncCopy(os.Stderr, errReader)

return errors.Join(<-stdoutErrCh, <-stderrErrCh)
}

// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value.
// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
// This function lets you copy from multiple sources into multiple destinations in parallel.
func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
errCh := make(chan error)
go func(errCh chan error) {
_, err := io.Copy(dst, src)
errCh <- err
}(errCh)

return errCh
}

func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
var retErr error
for _, req := range requests {
if err := logRequest(ctx, req, os.Stdout); err != nil {
return err
fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err)
retErr = fmt.Errorf("failed to collect logs from all Flux pods")
continue
}
}

return nil
return retErr
}

func createLabelStringFromMap(m map[string]string) string {
Expand Down
88 changes: 88 additions & 0 deletions cmd/flux/logs_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//go:build e2e
// +build e2e

/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"testing"
)

func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsWrongNamespace(t *testing.T) {
cmd := cmdTestCase{
args: "logs --flux-namespace=default",
assert: assertError(`no Flux pods found in namespace "default"`),
}
cmd.runTestCmd(t)
}

func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}

func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}
70 changes: 7 additions & 63 deletions cmd/flux/logs_test.go → cmd/flux/logs_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,73 +30,17 @@ import (
. "github.com/onsi/gomega"
)

func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}

func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}

func TestLogRequest(t *testing.T) {
mapper := &testResponseMapper{}
tests := []struct {
name string
namespace string
flags *logsFlags
flags logsFlags
assertFile string
}{
{
name: "all logs",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
allNamespaces: true,
},
Expand All @@ -105,22 +49,22 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace",
namespace: "default",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
},
assertFile: "testdata/logs/namespace.txt",
},
{
name: "filter by kind and namespace",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
kind: "Kustomization",
},
assertFile: "testdata/logs/kind.txt",
},
{
name: "filter by loglevel",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
allNamespaces: true,
Expand All @@ -130,7 +74,7 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace, name, loglevel and kind",
namespace: "flux-system",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
kind: "Kustomization",
Expand Down Expand Up @@ -163,7 +107,7 @@ func TestLogRequest(t *testing.T) {

// reset flags to default
*kubeconfigArgs.Namespace = rootArgs.defaults.Namespace
logsArgs = &logsFlags{
logsArgs = logsFlags{
tail: -1,
}
})
Expand Down
4 changes: 4 additions & 0 deletions cmd/flux/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ func resetCmdArgs() {
alertProviderArgs = alertProviderFlags{}
bootstrapArgs = NewBootstrapFlags()
bServerArgs = bServerFlags{}
logsArgs = logsFlags{
tail: -1,
fluxNamespace: rootArgs.defaults.Namespace,
}
buildKsArgs = buildKsFlags{}
checkArgs = checkFlags{}
createArgs = createFlags{}
Expand Down

0 comments on commit a3f2b1d

Please sign in to comment.