Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 270 additions & 37 deletions cmd/kubectl-ate/cmd/logs_actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@ package cmd

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"time"

"github.com/agent-substrate/substrate/cmd/kubectl-ate/pkg/client"
"github.com/agent-substrate/substrate/proto/ateapipb"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

var followLogs bool
var rawOutput bool

var logsActorsCmd = &cobra.Command{
Expand All @@ -40,26 +49,65 @@ var logsActorsCmd = &cobra.Command{
}

func init() {
logsActorsCmd.Flags().BoolVarP(&followLogs, "follow", "f", false, "Specify if the logs should be streamed.")
logsActorsCmd.Flags().BoolVar(&rawOutput, "raw", false, "Output raw JSON log lines instead of pretty-printed format")
logsCmd.AddCommand(logsActorsCmd)
}

func runLogsActor(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
actorID := args[0]
// AteAPIClient abstracts the gRPC client calls.
type AteAPIClient interface {
GetActor(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error)
Close()
}

apiClient, err := client.NewClient(ctx, kubeconfig, k8sContext, endpoint, traceEnabled)
if err != nil {
return fmt.Errorf("failed to connect to ate-api-server: %w", err)
// PodLogsStreamer abstracts log streaming from pods.
type PodLogsStreamer interface {
StreamLogs(ctx context.Context, namespace, podName string, opts *corev1.PodLogOptions) (io.ReadCloser, error)
}

// k8sPodLogsStreamer implements PodLogsStreamer using Kubernetes Clientset.
type k8sPodLogsStreamer struct {
clientset kubernetes.Interface
}

func (s *k8sPodLogsStreamer) StreamLogs(ctx context.Context, namespace, podName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) {
return s.clientset.CoreV1().Pods(namespace).GetLogs(podName, opts).Stream(ctx)
}

// LogsActorRunner executes the log printing or streaming.
type LogsActorRunner struct {
apiClient AteAPIClient
streamer PodLogsStreamer
stdout io.Writer
stderr io.Writer
follow bool
raw bool
pollInterval time.Duration
reconnectInterval time.Duration
tickerInterval time.Duration
}

// Run executes the logs command.
func (r *LogsActorRunner) Run(ctx context.Context, actorID string) error {
if r.pollInterval <= 0 {
r.pollInterval = 2 * time.Second
}
if r.reconnectInterval <= 0 {
r.reconnectInterval = 1 * time.Second
}
if r.tickerInterval <= 0 {
r.tickerInterval = 2 * time.Second
}
defer apiClient.Close()

k8sClient, err := client.NewK8sClientset(kubeconfig, k8sContext)
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
defer r.apiClient.Close()
if r.follow {
return r.runFollow(ctx, actorID)
}
return r.runOneShot(ctx, actorID)
}

actorResp, err := apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID})
func (r *LogsActorRunner) runOneShot(ctx context.Context, actorID string) error {
actorResp, err := r.apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID})
if err != nil {
return fmt.Errorf("failed to get actor: %w", err)
}
Expand All @@ -76,59 +124,217 @@ func runLogsActor(cmd *cobra.Command, args []string) error {
Follow: false,
}

req := k8sClient.CoreV1().Pods(namespace).GetLogs(podName, opts)
stream, err := req.Stream(ctx)
stream, err := r.streamer.StreamLogs(ctx, namespace, podName, opts)
if err != nil {
return fmt.Errorf("failed to stream logs from pod %s: %w", podName, err)
}
defer stream.Close()

scanner := bufio.NewScanner(stream)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines
for scanner.Scan() {
filterAndDisplayLogLine(scanner.Text(), actorID, os.Stdout, rawOutput)
line := scanner.Text()
filterAndDisplayLogLine(line, actorID, r.stdout, r.raw)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("error reading log stream: %w", err)
}

return nil
}

func filterAndDisplayLogLine(line, targetActorID string, w io.Writer, raw bool) bool {
func (r *LogsActorRunner) runFollow(ctx context.Context, actorID string) error {
var lastWorkerPod string
var lastSeenTime time.Time

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

actorResp, err := r.apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID})
if err != nil {
if status.Code(err) == codes.NotFound {
return fmt.Errorf("actor %s not found: %w", actorID, err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(r.pollInterval):
continue
}
}

actor := actorResp.GetActor()
podName := actor.GetAteomPodName()
namespace := actor.GetAteomPodNamespace()

if podName == "" || namespace == "" || actor.GetStatus() != ateapipb.Actor_STATUS_RUNNING {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(r.pollInterval):
continue
}
}

// actor is resumed on anther worker
if podName != lastWorkerPod {
fmt.Fprintf(r.stderr, "Actor is currently running on pod %s/%s\n", namespace, podName)
lastWorkerPod = podName
}

opts := &corev1.PodLogOptions{
Follow: true,
}
if !lastSeenTime.IsZero() {
opts.SinceTime = &metav1.Time{Time: lastSeenTime}
}

streamCtx, streamCancel := context.WithCancel(ctx)
stream, err := r.streamer.StreamLogs(streamCtx, namespace, podName, opts)
if err != nil {
streamCancel()
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(r.pollInterval):
continue
}
}

var wg sync.WaitGroup
r.startMigrationMonitor(streamCtx, streamCancel, &wg, actorID, podName)

scanner := bufio.NewScanner(stream)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines
for scanner.Scan() {
line := scanner.Text()
logTime, _ := filterAndDisplayLogLine(line, actorID, r.stdout, r.raw)
if !logTime.IsZero() {
lastSeenTime = logTime
}
}
stream.Close()
streamCancel()
wg.Wait()

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(r.reconnectInterval):
}
}
}

// startMigrationMonitor launches a background goroutine to query the control plane
// and aborts the stream context if the actor is suspended and then resumed to a different pod.
func (r *LogsActorRunner) startMigrationMonitor(
ctx context.Context,
cancel context.CancelFunc,
wg *sync.WaitGroup,
actorID string,
currentPod string,
) {
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(r.tickerInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
resp, err := r.apiClient.GetActor(ctx, &ateapipb.GetActorRequest{ActorId: actorID})
if err == nil {
act := resp.GetActor()
if act.GetStatus() != ateapipb.Actor_STATUS_RUNNING || act.GetAteomPodName() != currentPod {
// Actor suspended or migrated! Cancel stream context to reconnect.
cancel()
return
}
}
}
}
}()
}

func runLogsActor(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
actorID := args[0]

apiClient, err := client.NewClient(ctx, kubeconfig, k8sContext, endpoint, traceEnabled)
if err != nil {
return fmt.Errorf("failed to connect to ate-api-server: %w", err)
}

k8sClient, err := client.NewK8sClientset(kubeconfig, k8sContext)
if err != nil {
apiClient.Close()
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

runner := &LogsActorRunner{
apiClient: apiClient,
streamer: &k8sPodLogsStreamer{clientset: k8sClient},
stdout: os.Stdout,
stderr: os.Stderr,
follow: followLogs,
raw: rawOutput,
pollInterval: 2 * time.Second,
reconnectInterval: 1 * time.Second,
tickerInterval: 2 * time.Second,
}

return runner.Run(ctx, actorID)
}

func filterAndDisplayLogLine(line, targetActorID string, w io.Writer, raw bool) (time.Time, bool) {
var m map[string]any
if err := json.Unmarshal([]byte(line), &m); err != nil {
return false
return time.Time{}, false
}

var logTime time.Time
if tVal, ok := m["time"].(string); ok {
if t, err := time.Parse(time.RFC3339Nano, tVal); err == nil {
logTime = t
} else if t, err := time.Parse(time.RFC3339, tVal); err == nil {
logTime = t
}
}

labelsAny, ok := m["logging.googleapis.com/labels"]
if !ok {
labelsAny, ok = m["labels"]
if !ok {
return false
}
}
labels, ok := labelsAny.(map[string]any)
if !ok {
return false
var actorID string
if labelsAny != nil {
if labels, ok := labelsAny.(map[string]any); ok {
actorID, _ = labels["ate.dev/actor_id"].(string)
}
}
actorID, ok := labels["ate.dev/actor_id"].(string)
if !ok || actorID != targetActorID {
return false

matched := (actorID != "" && actorID == targetActorID)

if !matched {
return logTime, false
}

if raw {
fmt.Fprintln(w, line)
return true
return logTime, true
}

timeStr := ""
if tVal, ok := m["time"].(string); ok {
if t, err := time.Parse(time.RFC3339Nano, tVal); err == nil {
timeStr = t.Format("2006-01-02 15:04:05")
} else if t, err := time.Parse(time.RFC3339, tVal); err == nil {
timeStr = t.Format("2006-01-02 15:04:05")
} else {
timeStr = tVal
}
if !logTime.IsZero() {
timeStr = logTime.Format("2006-01-02 15:04:05")
} else if tVal, ok := m["time"].(string); ok {
timeStr = tVal
}

levelStr := "INFO"
Expand All @@ -143,9 +349,36 @@ func filterAndDisplayLogLine(line, targetActorID string, w io.Writer, raw bool)
msgStr = mVal
}

var extraParts []string
var extraKeys []string
for k := range m {
if k == "time" || k == "level" || k == "msg" || k == "message" || k == "logging.googleapis.com/labels" || k == "labels" {
continue
}
extraKeys = append(extraKeys, k)
}
sort.Strings(extraKeys)
for _, k := range extraKeys {
v := m[k]
if sVal, ok := v.(string); ok {
extraParts = append(extraParts, fmt.Sprintf("%s=%q", k, sVal))
} else {
if b, err := json.Marshal(v); err == nil {
extraParts = append(extraParts, fmt.Sprintf("%s=%s", k, string(b)))
} else {
extraParts = append(extraParts, fmt.Sprintf("%s=%v", k, v))
}
}
}

extraStr := ""
if len(extraParts) > 0 {
extraStr = " [" + strings.Join(extraParts, " ") + "]"
}

if timeStr != "" {
fmt.Fprintf(w, "[%s] ", timeStr)
}
fmt.Fprintf(w, "[%s] %s\n", levelStr, msgStr)
return true
fmt.Fprintf(w, "[%s] %s%s\n", levelStr, msgStr, extraStr)
return logTime, true
}
Loading
Loading