From 37e80c7c984fccaa5c4ee5ef2392c4553367d3ee Mon Sep 17 00:00:00 2001 From: haoyuxia Date: Wed, 20 May 2026 01:42:39 +0000 Subject: [PATCH] follow log support --- cmd/kubectl-ate/cmd/logs_actors.go | 321 ++++++++++++-- cmd/kubectl-ate/cmd/logs_actors_test.go | 547 +++++++++++++++++++++++- docs/observability.md | 17 +- 3 files changed, 844 insertions(+), 41 deletions(-) diff --git a/cmd/kubectl-ate/cmd/logs_actors.go b/cmd/kubectl-ate/cmd/logs_actors.go index 6496e8d..2ad045b 100644 --- a/cmd/kubectl-ate/cmd/logs_actors.go +++ b/cmd/kubectl-ate/cmd/logs_actors.go @@ -16,19 +16,29 @@ package cmd import ( "bufio" + "context" "encoding/json" + "errors" "fmt" "io" "os" + "sort" "strings" + "sync" "time" "github.com/agent-substrate/substrate/cmd/kubectl-ate/pkg/client" "github.com/agent-substrate/substrate/proto/ateapipb" "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" ) +var followLogs bool var rawOutput bool var logsActorsCmd = &cobra.Command{ @@ -40,26 +50,65 @@ var logsActorsCmd = &cobra.Command{ } func init() { + logsActorsCmd.Flags().BoolVarP(&followLogs, "follow", "f", false, "Specify if the logs should be streamed.") logsActorsCmd.Flags().BoolVar(&rawOutput, "raw", false, "Output raw JSON log lines instead of pretty-printed format") logsCmd.AddCommand(logsActorsCmd) } -func runLogsActor(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - actorID := args[0] +// AteAPIClient abstracts the gRPC client calls. +type AteAPIClient interface { + GetActor(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) + Close() +} - apiClient, err := client.NewClient(ctx, kubeconfig, k8sContext, endpoint, traceEnabled) - if err != nil { - return fmt.Errorf("failed to connect to ate-api-server: %w", err) +// PodLogsStreamer abstracts log streaming from pods. +type PodLogsStreamer interface { + StreamLogs(ctx context.Context, namespace, podName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) +} + +// k8sPodLogsStreamer implements PodLogsStreamer using Kubernetes Clientset. +type k8sPodLogsStreamer struct { + clientset kubernetes.Interface +} + +func (s *k8sPodLogsStreamer) StreamLogs(ctx context.Context, namespace, podName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + return s.clientset.CoreV1().Pods(namespace).GetLogs(podName, opts).Stream(ctx) +} + +// LogsActorRunner executes the log printing or streaming. +type LogsActorRunner struct { + apiClient AteAPIClient + streamer PodLogsStreamer + stdout io.Writer + stderr io.Writer + follow bool + raw bool + pollInterval time.Duration + reconnectInterval time.Duration + tickerInterval time.Duration +} + +// Run executes the logs command. +func (r *LogsActorRunner) Run(ctx context.Context, actorID string) error { + if r.pollInterval <= 0 { + r.pollInterval = 2 * time.Second + } + if r.reconnectInterval <= 0 { + r.reconnectInterval = 1 * time.Second + } + if r.tickerInterval <= 0 { + r.tickerInterval = 2 * time.Second } - defer apiClient.Close() - k8sClient, err := client.NewK8sClientset(kubeconfig, k8sContext) - if err != nil { - return fmt.Errorf("failed to create kubernetes client: %w", err) + defer r.apiClient.Close() + if r.follow { + return r.runFollow(ctx, actorID) } + return r.runOneShot(ctx, actorID) +} - actorResp, err := apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID}) +func (r *LogsActorRunner) runOneShot(ctx context.Context, actorID string) error { + actorResp, err := r.apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID}) if err != nil { return fmt.Errorf("failed to get actor: %w", err) } @@ -76,59 +125,230 @@ func runLogsActor(cmd *cobra.Command, args []string) error { Follow: false, } - req := k8sClient.CoreV1().Pods(namespace).GetLogs(podName, opts) - stream, err := req.Stream(ctx) + stream, err := r.streamer.StreamLogs(ctx, namespace, podName, opts) if err != nil { return fmt.Errorf("failed to stream logs from pod %s: %w", podName, err) } defer stream.Close() scanner := bufio.NewScanner(stream) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines for scanner.Scan() { - filterAndDisplayLogLine(scanner.Text(), actorID, os.Stdout, rawOutput) + line := scanner.Text() + filterAndDisplayLogLine(line, actorID, r.stdout, r.raw) } if err := scanner.Err(); err != nil { return fmt.Errorf("error reading log stream: %w", err) } - return nil } -func filterAndDisplayLogLine(line, targetActorID string, w io.Writer, raw bool) bool { +func (r *LogsActorRunner) runFollow(ctx context.Context, actorID string) error { + var lastWorkerPod string + var lastSeenTime time.Time + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + actorResp, err := r.apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID}) + if err != nil { + if status.Code(err) == codes.NotFound { + return fmt.Errorf("actor %s not found: %w", actorID, err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.pollInterval): + continue + } + } + + actor := actorResp.GetActor() + podName := actor.GetAteomPodName() + namespace := actor.GetAteomPodNamespace() + + if podName == "" || namespace == "" || actor.GetStatus() != ateapipb.Actor_STATUS_RUNNING { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.pollInterval): + continue + } + } + + // actor is resumed on anther worker + if podName != lastWorkerPod { + fmt.Fprintf(r.stderr, "Actor is currently running on pod %s/%s\n", namespace, podName) + lastWorkerPod = podName + } + + opts := &corev1.PodLogOptions{ + Follow: true, + } + if !lastSeenTime.IsZero() { + opts.SinceTime = &metav1.Time{Time: lastSeenTime} + } + + streamCtx, streamCancel := context.WithCancel(ctx) + stream, err := r.streamer.StreamLogs(streamCtx, namespace, podName, opts) + if err != nil { + streamCancel() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.pollInterval): + continue + } + } + + var wg sync.WaitGroup + r.startMigrationMonitor(streamCtx, streamCancel, &wg, actorID, podName) + + scanner := bufio.NewScanner(stream) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines + for scanner.Scan() { + line := scanner.Text() + logTime, _ := filterAndDisplayLogLine(line, actorID, r.stdout, r.raw) + if !logTime.IsZero() { + lastSeenTime = logTime + } + } + scanErr := scanner.Err() + stream.Close() + streamCancel() + wg.Wait() + + if scanErr != nil { + if errors.Is(scanErr, bufio.ErrTooLong) { + return fmt.Errorf("log line exceeded buffer limit: %w", scanErr) + } + if ctx.Err() != nil { + return ctx.Err() + } + if !errors.Is(scanErr, context.Canceled) { + fmt.Fprintf(r.stderr, "Error reading log stream: %v. Reconnecting...\n", scanErr) + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.reconnectInterval): + } + } +} + +// startMigrationMonitor launches a background goroutine to query the control plane +// and aborts the stream context if the actor is suspended and then resumed to a different pod. +func (r *LogsActorRunner) startMigrationMonitor( + ctx context.Context, + cancel context.CancelFunc, + wg *sync.WaitGroup, + actorID string, + currentPod string, +) { + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(r.tickerInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + resp, err := r.apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID}) + if err == nil { + act := resp.GetActor() + if act.GetStatus() != ateapipb.Actor_STATUS_RUNNING || act.GetAteomPodName() != currentPod { + // Actor suspended or migrated! Cancel stream context to reconnect. + cancel() + return + } + } + } + } + }() +} + +func runLogsActor(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + actorID := args[0] + + apiClient, err := client.NewClient(ctx, kubeconfig, k8sContext, endpoint, traceEnabled) + if err != nil { + return fmt.Errorf("failed to connect to ate-api-server: %w", err) + } + + k8sClient, err := client.NewK8sClientset(kubeconfig, k8sContext) + if err != nil { + apiClient.Close() + return fmt.Errorf("failed to create kubernetes client: %w", err) + } + + runner := &LogsActorRunner{ + apiClient: apiClient, + streamer: &k8sPodLogsStreamer{clientset: k8sClient}, + stdout: os.Stdout, + stderr: os.Stderr, + follow: followLogs, + raw: rawOutput, + pollInterval: 2 * time.Second, + reconnectInterval: 1 * time.Second, + tickerInterval: 2 * time.Second, + } + + return runner.Run(ctx, actorID) +} + +func filterAndDisplayLogLine(line, targetActorID string, w io.Writer, raw bool) (time.Time, bool) { var m map[string]any if err := json.Unmarshal([]byte(line), &m); err != nil { - return false + return time.Time{}, false + } + + var logTime time.Time + if tVal, ok := m["time"].(string); ok { + if t, err := time.Parse(time.RFC3339Nano, tVal); err == nil { + logTime = t + } else if t, err := time.Parse(time.RFC3339, tVal); err == nil { + logTime = t + } } + labelsAny, ok := m["logging.googleapis.com/labels"] if !ok { labelsAny, ok = m["labels"] - if !ok { - return false - } } - labels, ok := labelsAny.(map[string]any) - if !ok { - return false + var actorID string + if labelsAny != nil { + if labels, ok := labelsAny.(map[string]any); ok { + actorID, _ = labels["ate.dev/actor_id"].(string) + } } - actorID, ok := labels["ate.dev/actor_id"].(string) - if !ok || actorID != targetActorID { - return false + + matched := (actorID != "" && actorID == targetActorID) + + if !matched { + return logTime, false } if raw { fmt.Fprintln(w, line) - return true + return logTime, true } timeStr := "" - if tVal, ok := m["time"].(string); ok { - if t, err := time.Parse(time.RFC3339Nano, tVal); err == nil { - timeStr = t.Format("2006-01-02 15:04:05") - } else if t, err := time.Parse(time.RFC3339, tVal); err == nil { - timeStr = t.Format("2006-01-02 15:04:05") - } else { - timeStr = tVal - } + if !logTime.IsZero() { + timeStr = logTime.Format("2006-01-02 15:04:05") + } else if tVal, ok := m["time"].(string); ok { + timeStr = tVal } levelStr := "INFO" @@ -143,9 +363,36 @@ func filterAndDisplayLogLine(line, targetActorID string, w io.Writer, raw bool) msgStr = mVal } + var extraParts []string + var extraKeys []string + for k := range m { + if k == "time" || k == "level" || k == "msg" || k == "message" || k == "logging.googleapis.com/labels" || k == "labels" { + continue + } + extraKeys = append(extraKeys, k) + } + sort.Strings(extraKeys) + for _, k := range extraKeys { + v := m[k] + if sVal, ok := v.(string); ok { + extraParts = append(extraParts, fmt.Sprintf("%s=%q", k, sVal)) + } else { + if b, err := json.Marshal(v); err == nil { + extraParts = append(extraParts, fmt.Sprintf("%s=%s", k, string(b))) + } else { + extraParts = append(extraParts, fmt.Sprintf("%s=%v", k, v)) + } + } + } + + extraStr := "" + if len(extraParts) > 0 { + extraStr = " [" + strings.Join(extraParts, " ") + "]" + } + if timeStr != "" { fmt.Fprintf(w, "[%s] ", timeStr) } - fmt.Fprintf(w, "[%s] %s\n", levelStr, msgStr) - return true + fmt.Fprintf(w, "[%s] %s%s\n", levelStr, msgStr, extraStr) + return logTime, true } diff --git a/cmd/kubectl-ate/cmd/logs_actors_test.go b/cmd/kubectl-ate/cmd/logs_actors_test.go index db26c27..292742c 100644 --- a/cmd/kubectl-ate/cmd/logs_actors_test.go +++ b/cmd/kubectl-ate/cmd/logs_actors_test.go @@ -16,8 +16,19 @@ package cmd import ( "bytes" + "context" + "fmt" + "io" "strings" + "sync" "testing" + "time" + + "github.com/agent-substrate/substrate/proto/ateapipb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" ) func TestFilterAndDisplayLogLine(t *testing.T) { @@ -27,6 +38,7 @@ func TestFilterAndDisplayLogLine(t *testing.T) { targetActorID string raw bool wantMatched bool + wantTime string wantOutput string }{ { @@ -35,6 +47,7 @@ func TestFilterAndDisplayLogLine(t *testing.T) { targetActorID: "act-1", raw: false, wantMatched: true, + wantTime: "2026-05-16T01:03:38.602878302Z", wantOutput: `[2026-05-16 01:03:38] [INFO] Count`, }, { @@ -43,6 +56,7 @@ func TestFilterAndDisplayLogLine(t *testing.T) { targetActorID: "act-1", raw: false, wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", wantOutput: `[2026-05-16 01:03:38] [WARN] Hello`, }, { @@ -51,6 +65,7 @@ func TestFilterAndDisplayLogLine(t *testing.T) { targetActorID: "act-1", raw: false, wantMatched: true, + wantTime: "", wantOutput: `[ERROR] Failed`, }, { @@ -59,6 +74,7 @@ func TestFilterAndDisplayLogLine(t *testing.T) { targetActorID: "act-1", raw: true, wantMatched: true, + wantTime: "2026-05-16T01:03:38.602878302Z", wantOutput: `{"time":"2026-05-16T01:03:38.602878302Z","level":"info","msg":"Count","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1"}}`, }, { @@ -67,14 +83,16 @@ func TestFilterAndDisplayLogLine(t *testing.T) { targetActorID: "act-1", raw: false, wantMatched: true, + wantTime: "2026-05-16T01:03:38.602878302Z", wantOutput: `[2026-05-16 01:03:38] [INFO] Count`, }, { name: "non-matching actor", - line: `{"message":"Hello world","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-2"}}`, + line: `{"time":"2026-05-16T01:03:38Z","message":"Hello world","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-2"}}`, targetActorID: "act-1", raw: false, wantMatched: false, + wantTime: "2026-05-16T01:03:38Z", wantOutput: "", }, { @@ -83,19 +101,43 @@ func TestFilterAndDisplayLogLine(t *testing.T) { targetActorID: "act-1", raw: false, wantMatched: false, + wantTime: "", wantOutput: "", }, + { + name: "matching actor, pretty printed with non-standard JSON fields sorted and appended", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"Hello","traceID":"abc-123","err":"timeout","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1"}}`, + targetActorID: "act-1", + raw: false, + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `[2026-05-16 01:03:38] [INFO] Hello [err="timeout" traceID="abc-123"]`, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var buf bytes.Buffer - matched := filterAndDisplayLogLine(tc.line, tc.targetActorID, &buf, tc.raw) + logTime, matched := filterAndDisplayLogLine(tc.line, tc.targetActorID, &buf, tc.raw) if matched != tc.wantMatched { t.Errorf("got matched = %v, want %v", matched, tc.wantMatched) } + if tc.wantTime != "" { + parsedTime, err := time.Parse(time.RFC3339Nano, tc.wantTime) + if err != nil { + parsedTime, _ = time.Parse(time.RFC3339, tc.wantTime) + } + if !logTime.Equal(parsedTime) { + t.Errorf("got logTime = %v, want %v", logTime, parsedTime) + } + } else { + if !logTime.IsZero() { + t.Errorf("got non-zero logTime = %v, want zero", logTime) + } + } + gotOutput := strings.TrimSpace(buf.String()) if gotOutput != tc.wantOutput { t.Errorf("got output %q, want %q", gotOutput, tc.wantOutput) @@ -103,3 +145,504 @@ func TestFilterAndDisplayLogLine(t *testing.T) { }) } } + +type mockAteAPIClient struct { + GetActorFunc func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) + CloseCalls int +} + +func (m *mockAteAPIClient) GetActor(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + if m.GetActorFunc != nil { + return m.GetActorFunc(ctx, in, opts...) + } + return nil, fmt.Errorf("GetActorFunc not implemented") +} + +func (m *mockAteAPIClient) Close() { + m.CloseCalls++ +} + +type mockPodLogsStreamer struct { + StreamLogsFunc func(ctx context.Context, namespace, podName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) +} + +func (m *mockPodLogsStreamer) StreamLogs(ctx context.Context, namespace, podName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + if m.StreamLogsFunc != nil { + return m.StreamLogsFunc(ctx, namespace, podName, opts) + } + return nil, fmt.Errorf("StreamLogsFunc not implemented") +} + +func TestLogsActorRunner_Run_OneShotSuccess(t *testing.T) { + actorID := "act-123" + podName := "pod-xyz" + namespace := "ns-abc" + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + if in.ActorId != actorID { + return nil, fmt.Errorf("unexpected actor ID: %s", in.ActorId) + } + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: podName, + AteomPodNamespace: namespace, + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + }, + } + + logLine := `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"Hello world","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-123"}}` + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(ctx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + if ns != namespace || name != podName { + return nil, fmt.Errorf("unexpected pod %s/%s", ns, name) + } + if opts.Follow { + return nil, fmt.Errorf("expected follow to be false in one-shot mode") + } + return io.NopCloser(strings.NewReader(logLine + "\n")), nil + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: false, + raw: false, + } + + err := runner.Run(context.Background(), actorID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if mockAPI.CloseCalls != 1 { + t.Errorf("expected Close to be called once, got %d", mockAPI.CloseCalls) + } + + gotOutput := strings.TrimSpace(stdout.String()) + wantOutput := `[2026-05-16 01:03:38] [INFO] Hello world` + if gotOutput != wantOutput { + t.Errorf("got stdout %q, want %q", gotOutput, wantOutput) + } +} + +func TestLogsActorRunner_Run_OneShot_ActorNotRunning(t *testing.T) { + actorID := "act-123" + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + Status: ateapipb.Actor_STATUS_SUSPENDED, // not running + }, + }, nil + }, + } + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(ctx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + return nil, fmt.Errorf("StreamLogs should not be called") + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: false, + } + + err := runner.Run(context.Background(), actorID) + if err == nil { + t.Fatal("expected error, got nil") + } + + wantErrMsg := "actor act-123 is not currently running on any worker pod" + if !strings.Contains(err.Error(), wantErrMsg) { + t.Errorf("unexpected error message: %v (expected substring %q)", err, wantErrMsg) + } + + if mockAPI.CloseCalls != 1 { + t.Errorf("expected Close to be called once, got %d", mockAPI.CloseCalls) + } +} + +func TestLogsActorRunner_Run_Follow_SuspendedToRunning(t *testing.T) { + actorID := "act-123" + podName := "pod-xyz" + namespace := "ns-abc" + + var getActorCalls int + var getActorMu sync.Mutex + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + getActorMu.Lock() + defer getActorMu.Unlock() + getActorCalls++ + + if getActorCalls == 1 { + // First call: suspended + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + Status: ateapipb.Actor_STATUS_SUSPENDED, + }, + }, nil + } + + // Subsequent calls: running + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: podName, + AteomPodNamespace: namespace, + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + }, + } + + logLine := `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"Follow hello","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-123"}}` + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(streamCtx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + if ns != namespace || name != podName { + return nil, fmt.Errorf("unexpected pod %s/%s", ns, name) + } + if !opts.Follow { + return nil, fmt.Errorf("expected follow to be true in follow mode") + } + + // Cancel main context soon to break the outer infinite loop + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + if opts.SinceTime != nil { + return io.NopCloser(strings.NewReader("")), nil + } + + return io.NopCloser(strings.NewReader(logLine + "\n")), nil + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: true, + raw: false, + pollInterval: 1 * time.Millisecond, + reconnectInterval: 1 * time.Millisecond, + tickerInterval: 1 * time.Millisecond, + } + + err := runner.Run(ctx, actorID) + if err != nil && err != context.Canceled { + t.Fatalf("unexpected error: %v", err) + } + + if mockAPI.CloseCalls != 1 { + t.Errorf("expected Close to be called once, got %d", mockAPI.CloseCalls) + } + + gotStderr := stderr.String() + wantErrStderr := fmt.Sprintf("Actor is currently running on pod %s/%s\n", namespace, podName) + if !strings.Contains(gotStderr, wantErrStderr) { + t.Errorf("got stderr %q, want it to contain %q", gotStderr, wantErrStderr) + } + + gotStdout := strings.TrimSpace(stdout.String()) + wantStdout := `[2026-05-16 01:03:38] [INFO] Follow hello` + if gotStdout != wantStdout { + t.Errorf("got stdout %q, want %q", gotStdout, wantStdout) + } +} + +func TestLogsActorRunner_Run_Follow_NotFoundActor(t *testing.T) { + actorID := "act-notfound" + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + return nil, status.Error(codes.NotFound, "actor not found") + }, + } + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(ctx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + return nil, fmt.Errorf("StreamLogs should not be called") + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: true, + pollInterval: 1 * time.Millisecond, + reconnectInterval: 1 * time.Millisecond, + tickerInterval: 1 * time.Millisecond, + } + + err := runner.Run(context.Background(), actorID) + if err == nil { + t.Fatal("expected error, got nil") + } + + wantErrMsg := "actor act-notfound not found" + if !strings.Contains(err.Error(), wantErrMsg) { + t.Errorf("unexpected error: %v (expected %q)", err, wantErrMsg) + } +} + +func TestLogsActorRunner_Run_Follow_ActorMigration(t *testing.T) { + actorID := "act-migrate" + + var getActorCalls int + var getActorMu sync.Mutex + + lineRead := make(chan struct{}) + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + getActorMu.Lock() + defer getActorMu.Unlock() + getActorCalls++ + + if getActorCalls == 1 { + // 1. Initial call for stream 1: pod-1 + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: "pod-1", + AteomPodNamespace: "ns", + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + } + + // 2. Poll call or reconnect call: pod-2 + // Wait until the first log line is actually read by the scanner to prevent premature cancellation + select { + case <-lineRead: + case <-ctx.Done(): + return nil, ctx.Err() + } + + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: "pod-2", + AteomPodNamespace: "ns", + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var streamCalls int + var streamMu sync.Mutex + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(streamCtx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + streamMu.Lock() + defer streamMu.Unlock() + streamCalls++ + + if streamCalls == 1 { + if name != "pod-1" { + return nil, fmt.Errorf("expected pod-1, got %s", name) + } + // Return a read closer that blocks or keeps stream open + // So the migration checking ticker gets triggered. + pr, pw := io.Pipe() + go func() { + // write one line and then keep it open + fmt.Fprintln(pw, `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"line 1 from pod-1","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-migrate"}}`) + close(lineRead) // guaranteed to have been read because io.Pipe is unbuffered! + // wait until context is cancelled + <-streamCtx.Done() + pw.Close() + }() + return pr, nil + } + + // Reconnection to pod-2! + if name != "pod-2" { + return nil, fmt.Errorf("expected pod-2, got %s", name) + } + + // Now we can cancel the main context to exit the follow loop + cancel() + + return io.NopCloser(strings.NewReader(`{"time":"2026-05-16T01:03:39Z","level":"info","msg":"line 1 from pod-2","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-migrate"}}` + "\n")), nil + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: true, + pollInterval: 1 * time.Millisecond, + reconnectInterval: 1 * time.Millisecond, + tickerInterval: 1 * time.Millisecond, + } + + err := runner.Run(ctx, actorID) + if err != nil && err != context.Canceled { + t.Fatalf("unexpected error: %v", err) + } + + stdoutStr := stdout.String() + if !strings.Contains(stdoutStr, "line 1 from pod-1") { + t.Errorf("expected output to contain log from pod-1, got %q", stdoutStr) + } + if !strings.Contains(stdoutStr, "line 1 from pod-2") { + t.Errorf("expected output to contain log from pod-2, got %q", stdoutStr) + } +} + +func TestLogsActorRunner_Run_Follow_ActorSuspendedMidStream(t *testing.T) { + actorID := "act-suspended-mid" + + var getActorCalls int + var getActorMu sync.Mutex + + lineRead := make(chan struct{}) + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + getActorMu.Lock() + defer getActorMu.Unlock() + getActorCalls++ + + // 1. Initial call: running on pod-1 + if getActorCalls == 1 { + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: "pod-1", + AteomPodNamespace: "ns", + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + } + + // 2. Poll call from background ticker: suspended + if getActorCalls == 2 { + // Wait until the scanner has actually read the initial log line + select { + case <-lineRead: + case <-ctx.Done(): + return nil, ctx.Err() + } + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + Status: ateapipb.Actor_STATUS_SUSPENDED, + }, + }, nil + } + + // 3. Loop reconnection call: suspended (still suspended, so it will wait) + if getActorCalls == 3 { + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + Status: ateapipb.Actor_STATUS_SUSPENDED, + }, + }, nil + } + + // 4. Subsequent loop reconnection call: running again on pod-1 + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: "pod-1", + AteomPodNamespace: "ns", + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var streamCalls int + var streamMu sync.Mutex + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(streamCtx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + streamMu.Lock() + defer streamMu.Unlock() + streamCalls++ + + if streamCalls == 1 { + pr, pw := io.Pipe() + go func() { + fmt.Fprintln(pw, `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"before suspend","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-suspended-mid"}}`) + close(lineRead) // guaranteed to have been read! + <-streamCtx.Done() + pw.Close() + }() + return pr, nil + } + + // Second stream (after resuming): cancel context to stop test + cancel() + + return io.NopCloser(strings.NewReader(`{"time":"2026-05-16T01:03:40Z","level":"info","msg":"after resume","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-suspended-mid"}}` + "\n")), nil + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: true, + pollInterval: 1 * time.Millisecond, + reconnectInterval: 1 * time.Millisecond, + tickerInterval: 1 * time.Millisecond, + } + + err := runner.Run(ctx, actorID) + if err != nil && err != context.Canceled { + t.Fatalf("unexpected error: %v", err) + } + + stdoutStr := stdout.String() + if !strings.Contains(stdoutStr, "before suspend") { + t.Errorf("expected output to contain 'before suspend', got %q", stdoutStr) + } + if !strings.Contains(stdoutStr, "after resume") { + t.Errorf("expected output to contain 'after resume', got %q", stdoutStr) + } +} diff --git a/docs/observability.md b/docs/observability.md index fd299fe..6725951 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -23,10 +23,10 @@ Agent Substrate captures container standard output/error, wraps them into struct For quick, on-demand debugging of an active actor, use the Agent Substrate CLI: ```bash -kubectl ate logs +kubectl ate logs [--follow / -f] ``` -> **Note:** `kubectl ate logs` queries the Kubernetes API of the worker pod where the actor is *currently* running. It is designed for immediate inspection of active actors. To view historical logs across past worker pods and suspension cycles, use a centralized logging backend. +> **Note:** By default, `kubectl ate logs` queries the Kubernetes API of the worker pod where the actor is *currently* running. It is designed for immediate inspection of active actors. To view historical logs across past worker pods and suspension cycles, use a centralized logging backend. #### Example 1: Actor Not Currently Running If an actor is suspended or not assigned to a worker pod, the CLI informs you immediately: @@ -56,6 +56,19 @@ $ kubectl ate logs test --raw {"count":9,"fshash":"JiOzRUA5Ab+aro4YnhADSSMq8gUXhh/DMNSFzl75Q7c","level":"INFO","logging.googleapis.com/labels":{"actor_id":"test","actor_namespace":"ate-demo-counter","actor_template":"counter"},"msg":"Count","time":"2026-05-19T18:40:54.957798659Z"} ``` +#### Example 4: Streaming/Live Logs (`--follow` or `-f`) +To stream actor logs in real-time, append the `--follow` (or `-f`) flag. The CLI is fully actor-aware, automatically resuming the stream if the actor is suspended or migrates to a different worker pod: + +```bash +$ kubectl ate logs test -f +Actor is currently running on pod ate-demo-counter/counter-deployment-d8f99-m7d96 +[2026-05-19 18:39:24] [INFO] Count +[2026-05-19 18:39:34] [INFO] Count +Actor is currently running on pod ate-demo-counter/counter-deployment-ab123-x4y5z +[2026-05-19 18:40:02] [INFO] Count +``` + + --- ### Centralized Logging Backends (Multi-Dimensional Aggregation)