diff --git a/README.md b/README.md index 13082b8..3ac4dcb 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ A powerful command-line utility for debugging, monitoring, and inspecting A2A se - **Server Connectivity**: Test connections to A2A servers and retrieve agent information - **Task Management**: List, filter, and inspect tasks with detailed status information +- **Real-time Streaming**: Submit streaming tasks and monitor real-time agent responses +- **Streaming Summaries**: Summaries with Task IDs, durations, and event counts - **Conversation History**: View detailed conversation histories and message flows - **Agent Information**: Retrieve and display agent cards with capabilities - **Configuration Management**: Set, get, and list configuration values with namespace commands @@ -123,9 +125,11 @@ a2a config list # List all configuration values #### Task Commands ```bash -a2a tasks list # List available tasks -a2a tasks get # Get detailed task information -a2a tasks history # Get conversation history for a context +a2a tasks list # List available tasks +a2a tasks get # Get detailed task information +a2a tasks history # Get conversation history for a context +a2a tasks submit # Submit a task and get response +a2a tasks submit-streaming # Submit streaming task with real-time responses and summary ``` #### Server Commands diff --git a/cli/cli.go b/cli/cli.go index d8b7e78..091e473 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -696,6 +696,7 @@ var submitStreamingTaskCmd = &cobra.Command{ showRaw, _ := cmd.Flags().GetBool("raw") messageID := fmt.Sprintf("msg-%d", time.Now().Unix()) + startTime := time.Now() params := adk.MessageSendParams{ Message: adk.Message{ @@ -734,28 +735,83 @@ var submitStreamingTaskCmd = &cobra.Command{ fmt.Printf("Message ID: %s\n", messageID) fmt.Printf("\nšŸ”„ Streaming responses:\n\n") + var streamingSummary struct { + TaskID string + ContextID string + FinalStatus string + StatusUpdates int + ArtifactUpdates int + TotalEvents int + FinalMessage *adk.Message + } + for event := range eventChan { + streamingSummary.TotalEvents++ + + eventJSON, err := json.Marshal(event) + if err != nil { + logger.Error("Failed to marshal event", zap.Error(err)) + continue + } + + var genericEvent map[string]interface{} + if err := json.Unmarshal(eventJSON, &genericEvent); err != nil { + logger.Error("Failed to unmarshal generic event", zap.Error(err)) + continue + } + + kind, ok := genericEvent["kind"].(string) + if ok { + switch kind { + case "status-update": + streamingSummary.StatusUpdates++ + var statusEvent a2a.TaskStatusUpdateEvent + if err := json.Unmarshal(eventJSON, &statusEvent); err == nil { + if streamingSummary.TaskID == "" { + streamingSummary.TaskID = statusEvent.TaskID + } + if streamingSummary.ContextID == "" { + streamingSummary.ContextID = statusEvent.ContextID + } + streamingSummary.FinalStatus = string(statusEvent.Status.State) + if statusEvent.Status.Message != nil { + adkParts := make([]adk.Part, len(statusEvent.Status.Message.Parts)) + for i, part := range statusEvent.Status.Message.Parts { + adkParts[i] = adk.Part(part) + } + + adkMessage := &adk.Message{ + Kind: statusEvent.Status.Message.Kind, + MessageID: statusEvent.Status.Message.MessageID, + Role: statusEvent.Status.Message.Role, + Parts: adkParts, + ContextID: statusEvent.Status.Message.ContextID, + } + streamingSummary.FinalMessage = adkMessage + } + } + case "artifact-update": + streamingSummary.ArtifactUpdates++ + var artifactEvent a2a.TaskArtifactUpdateEvent + if err := json.Unmarshal(eventJSON, &artifactEvent); err == nil { + if streamingSummary.TaskID == "" { + streamingSummary.TaskID = artifactEvent.TaskID + } + if streamingSummary.ContextID == "" { + streamingSummary.ContextID = artifactEvent.ContextID + } + } + } + } + if showRaw { - eventJSON, err := json.MarshalIndent(event, "", " ") + eventJSONFormatted, err := json.MarshalIndent(event, "", " ") if err != nil { logger.Error("Failed to marshal event", zap.Error(err)) continue } - fmt.Printf("šŸ“” Raw Event:\n%s\n\n", eventJSON) + fmt.Printf("šŸ“” Raw Event:\n%s\n\n", eventJSONFormatted) } else { - eventJSON, err := json.Marshal(event) - if err != nil { - logger.Error("Failed to marshal event", zap.Error(err)) - continue - } - - var genericEvent map[string]interface{} - if err := json.Unmarshal(eventJSON, &genericEvent); err != nil { - logger.Error("Failed to unmarshal generic event", zap.Error(err)) - continue - } - - kind, ok := genericEvent["kind"].(string) if !ok { fmt.Printf("šŸ”” Unknown Event (no kind field)\n") continue @@ -827,18 +883,28 @@ var submitStreamingTaskCmd = &cobra.Command{ default: fmt.Printf("šŸ”” Unknown Event Type: %s\n", kind) - if showRaw { - eventJSON, err := json.MarshalIndent(event, "", " ") - if err == nil { - fmt.Printf("%s\n", eventJSON) - } - } } fmt.Printf("\n") } } - fmt.Printf("āœ… Streaming completed!\n") + duration := time.Since(startTime) + + fmt.Printf("āœ… Streaming completed!\n\n") + fmt.Printf("šŸ“‹ Streaming Summary:\n") + fmt.Printf(" Task ID: %s\n", streamingSummary.TaskID) + fmt.Printf(" Context ID: %s\n", streamingSummary.ContextID) + fmt.Printf(" Final Status: %s\n", streamingSummary.FinalStatus) + fmt.Printf(" Duration: %s\n", duration.Round(time.Millisecond)) + fmt.Printf(" Total Events: %d\n", streamingSummary.TotalEvents) + fmt.Printf(" Status Updates: %d\n", streamingSummary.StatusUpdates) + fmt.Printf(" Artifact Updates: %d\n", streamingSummary.ArtifactUpdates) + + if streamingSummary.FinalMessage != nil { + fmt.Printf(" Final Message Parts: %d\n", len(streamingSummary.FinalMessage.Parts)) + } + + fmt.Printf("\n") return nil }, } diff --git a/cli/cli_test.go b/cli/cli_test.go new file mode 100644 index 0000000..93ab738 --- /dev/null +++ b/cli/cli_test.go @@ -0,0 +1,251 @@ +package cli + +import ( + "bytes" + "context" + "fmt" + "net/http" + "os" + "strings" + "testing" + "time" + + a2a "github.com/inference-gateway/a2a-debugger/a2a" + client "github.com/inference-gateway/adk/client" + adk "github.com/inference-gateway/adk/types" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +// mockA2AClient implements the A2AClient interface for testing +type mockA2AClient struct { + sendTaskStreamingFunc func(ctx context.Context, params adk.MessageSendParams, eventChan chan<- interface{}) error +} + +func (m *mockA2AClient) GetAgentCard(ctx context.Context) (*adk.AgentCard, error) { + return nil, fmt.Errorf("not implemented") +} + +func (m *mockA2AClient) GetHealth(ctx context.Context) (*client.HealthResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (m *mockA2AClient) ListTasks(ctx context.Context, params adk.TaskListParams) (*adk.JSONRPCSuccessResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (m *mockA2AClient) GetTask(ctx context.Context, params adk.TaskQueryParams) (*adk.JSONRPCSuccessResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (m *mockA2AClient) SendTask(ctx context.Context, params adk.MessageSendParams) (*adk.JSONRPCSuccessResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (m *mockA2AClient) SendTaskStreaming(ctx context.Context, params adk.MessageSendParams, eventChan chan<- interface{}) error { + if m.sendTaskStreamingFunc != nil { + return m.sendTaskStreamingFunc(ctx, params, eventChan) + } + return fmt.Errorf("not implemented") +} + +func (m *mockA2AClient) CancelTask(ctx context.Context, params adk.TaskIdParams) (*adk.JSONRPCSuccessResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (m *mockA2AClient) SetTimeout(timeout time.Duration) {} + +func (m *mockA2AClient) SetHTTPClient(client *http.Client) {} + +func (m *mockA2AClient) GetBaseURL() string { + return "http://localhost:8080" +} + +func (m *mockA2AClient) SetLogger(logger *zap.Logger) {} + +func (m *mockA2AClient) GetLogger() *zap.Logger { + return zap.NewNop() +} + +func TestSubmitStreamingTaskCmd_StreamingSummary(t *testing.T) { + originalClient := a2aClient + originalLogger := logger + + testLogger, _ := zap.NewDevelopment() + logger = testLogger + + mockClient := &mockA2AClient{ + sendTaskStreamingFunc: func(ctx context.Context, params adk.MessageSendParams, eventChan chan<- interface{}) error { + statusEvent := a2a.TaskStatusUpdateEvent{ + Kind: "status-update", + TaskID: "test-task-123", + ContextID: "test-context-456", + Status: a2a.TaskStatus{ + State: a2a.TaskStateWorking, + Message: &a2a.Message{ + Kind: "message", + MessageID: "msg-123", + Role: "assistant", + Parts: []a2a.Part{ + map[string]interface{}{ + "kind": "text", + "text": "Test response", + }, + }, + }, + }, + Final: false, + } + eventChan <- statusEvent + + artifactEvent := a2a.TaskArtifactUpdateEvent{ + Kind: "artifact-update", + TaskID: "test-task-123", + ContextID: "test-context-456", + Artifact: a2a.Artifact{ + ArtifactID: "artifact-123", + Parts: []a2a.Part{ + map[string]interface{}{ + "kind": "text", + "text": "Test artifact content", + }, + }, + }, + } + eventChan <- artifactEvent + + finalStatusEvent := a2a.TaskStatusUpdateEvent{ + Kind: "status-update", + TaskID: "test-task-123", + ContextID: "test-context-456", + Status: a2a.TaskStatus{ + State: a2a.TaskStateCompleted, + Message: &a2a.Message{ + Kind: "message", + MessageID: "msg-124", + Role: "assistant", + Parts: []a2a.Part{ + map[string]interface{}{ + "kind": "text", + "text": "Task completed", + }, + }, + }, + }, + Final: true, + } + eventChan <- finalStatusEvent + + return nil + }, + } + a2aClient = mockClient + + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + cmd := &cobra.Command{} + cmd.Flags().String("context-id", "", "Context ID for the task") + cmd.Flags().Bool("raw", false, "Show raw streaming event data") + + err := submitStreamingTaskCmd.RunE(cmd, []string{"test message"}) + + _ = w.Close() + os.Stdout = oldStdout + var buf bytes.Buffer + _, _ = buf.ReadFrom(r) + output := buf.String() + + a2aClient = originalClient + logger = originalLogger + + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + expectedParts := []string{ + "Streaming Summary:", + "Task ID: test-task-123", + "Context ID: test-context-456", + "Final Status: completed", + "Duration:", + "Total Events: 3", + "Status Updates: 2", + "Artifact Updates: 1", + "Final Message Parts: 1", + } + + for _, part := range expectedParts { + if !strings.Contains(output, part) { + t.Errorf("Expected output to contain '%s', but it didn't.\nActual output:\n%s", part, output) + } + } +} + +func TestSubmitStreamingTaskCmd_RawMode(t *testing.T) { + originalClient := a2aClient + originalLogger := logger + + testLogger, _ := zap.NewDevelopment() + logger = testLogger + + mockClient := &mockA2AClient{ + sendTaskStreamingFunc: func(ctx context.Context, params adk.MessageSendParams, eventChan chan<- interface{}) error { + statusEvent := a2a.TaskStatusUpdateEvent{ + Kind: "status-update", + TaskID: "test-task-456", + ContextID: "test-context-789", + Status: a2a.TaskStatus{ + State: a2a.TaskStateCompleted, + }, + Final: true, + } + eventChan <- statusEvent + + return nil + }, + } + a2aClient = mockClient + + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + cmd := &cobra.Command{} + cmd.Flags().String("context-id", "", "Context ID for the task") + cmd.Flags().Bool("raw", true, "Show raw streaming event data") + _ = cmd.Flag("raw").Value.Set("true") + + err := submitStreamingTaskCmd.RunE(cmd, []string{"test message"}) + + _ = w.Close() + os.Stdout = oldStdout + var buf bytes.Buffer + _, _ = buf.ReadFrom(r) + output := buf.String() + + a2aClient = originalClient + logger = originalLogger + + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + expectedParts := []string{ + "Raw Event:", + "Streaming Summary:", + "Task ID: test-task-456", + "Context ID: test-context-789", + "Final Status: completed", + "Total Events: 1", + "Status Updates: 1", + "Artifact Updates: 0", + } + + for _, part := range expectedParts { + if !strings.Contains(output, part) { + t.Errorf("Expected output to contain '%s', but it didn't.\nActual output:\n%s", part, output) + } + } +} diff --git a/example/README.md b/example/README.md index 19432ad..73b9d93 100644 --- a/example/README.md +++ b/example/README.md @@ -86,6 +86,30 @@ docker compose run --rm a2a-debugger tasks submit-streaming "Write a Python func docker compose run --rm a2a-debugger tasks submit-streaming "Explain how A2A streaming works" --context-id docs-ctx ``` +#### Streaming Summary Features + +After streaming completes, the debugger provides a comprehensive summary including: + +- **Task ID**: The unique identifier generated by the A2A server +- **Context ID**: The conversation context identifier +- **Final Status**: The final task status (completed, failed, etc.) +- **Duration**: Total time for the streaming operation +- **Event Counts**: Number of status updates and artifact updates received +- **Message Parts**: Count of parts in the final agent response + +Example summary output: +``` +šŸ“‹ Streaming Summary: + Task ID: task-xyz123 + Context ID: ctx-abc789 + Final Status: completed + Duration: 2.5s + Total Events: 5 + Status Updates: 3 + Artifact Updates: 2 + Final Message Parts: 2 +``` + ### Configuration Management ```bash