Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ A powerful command-line utility for debugging, monitoring, and inspecting A2A se

- **Server Connectivity**: Test connections to A2A servers and retrieve agent information
- **Task Management**: List, filter, and inspect tasks with detailed status information
- **Real-time Streaming**: Submit streaming tasks and monitor real-time agent responses
- **Streaming Summaries**: Summaries with Task IDs, durations, and event counts
- **Conversation History**: View detailed conversation histories and message flows
- **Agent Information**: Retrieve and display agent cards with capabilities
- **Configuration Management**: Set, get, and list configuration values with namespace commands
Expand Down Expand Up @@ -123,9 +125,11 @@ a2a config list # List all configuration values
#### Task Commands

```bash
a2a tasks list # List available tasks
a2a tasks get <task-id> # Get detailed task information
a2a tasks history <context-id> # Get conversation history for a context
a2a tasks list # List available tasks
a2a tasks get <task-id> # Get detailed task information
a2a tasks history <context-id> # Get conversation history for a context
a2a tasks submit <message> # Submit a task and get response
a2a tasks submit-streaming <msg> # Submit streaming task with real-time responses and summary
```

#### Server Commands
Expand Down
110 changes: 88 additions & 22 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ var submitStreamingTaskCmd = &cobra.Command{
showRaw, _ := cmd.Flags().GetBool("raw")

messageID := fmt.Sprintf("msg-%d", time.Now().Unix())
startTime := time.Now()

params := adk.MessageSendParams{
Message: adk.Message{
Expand Down Expand Up @@ -734,28 +735,83 @@ var submitStreamingTaskCmd = &cobra.Command{
fmt.Printf("Message ID: %s\n", messageID)
fmt.Printf("\n🔄 Streaming responses:\n\n")

var streamingSummary struct {
TaskID string
ContextID string
FinalStatus string
StatusUpdates int
ArtifactUpdates int
TotalEvents int
FinalMessage *adk.Message
}

for event := range eventChan {
streamingSummary.TotalEvents++

eventJSON, err := json.Marshal(event)
if err != nil {
logger.Error("Failed to marshal event", zap.Error(err))
continue
}

var genericEvent map[string]interface{}
if err := json.Unmarshal(eventJSON, &genericEvent); err != nil {
logger.Error("Failed to unmarshal generic event", zap.Error(err))
continue
}

kind, ok := genericEvent["kind"].(string)
if ok {
switch kind {
case "status-update":
streamingSummary.StatusUpdates++
var statusEvent a2a.TaskStatusUpdateEvent
if err := json.Unmarshal(eventJSON, &statusEvent); err == nil {
if streamingSummary.TaskID == "" {
streamingSummary.TaskID = statusEvent.TaskID
}
if streamingSummary.ContextID == "" {
streamingSummary.ContextID = statusEvent.ContextID
}
streamingSummary.FinalStatus = string(statusEvent.Status.State)
if statusEvent.Status.Message != nil {
adkParts := make([]adk.Part, len(statusEvent.Status.Message.Parts))
for i, part := range statusEvent.Status.Message.Parts {
adkParts[i] = adk.Part(part)
}

adkMessage := &adk.Message{
Kind: statusEvent.Status.Message.Kind,
MessageID: statusEvent.Status.Message.MessageID,
Role: statusEvent.Status.Message.Role,
Parts: adkParts,
ContextID: statusEvent.Status.Message.ContextID,
}
streamingSummary.FinalMessage = adkMessage
}
}
case "artifact-update":
streamingSummary.ArtifactUpdates++
var artifactEvent a2a.TaskArtifactUpdateEvent
if err := json.Unmarshal(eventJSON, &artifactEvent); err == nil {
if streamingSummary.TaskID == "" {
streamingSummary.TaskID = artifactEvent.TaskID
}
if streamingSummary.ContextID == "" {
streamingSummary.ContextID = artifactEvent.ContextID
}
}
}
}

if showRaw {
eventJSON, err := json.MarshalIndent(event, "", " ")
eventJSONFormatted, err := json.MarshalIndent(event, "", " ")
if err != nil {
logger.Error("Failed to marshal event", zap.Error(err))
continue
}
fmt.Printf("📡 Raw Event:\n%s\n\n", eventJSON)
fmt.Printf("📡 Raw Event:\n%s\n\n", eventJSONFormatted)
} else {
eventJSON, err := json.Marshal(event)
if err != nil {
logger.Error("Failed to marshal event", zap.Error(err))
continue
}

var genericEvent map[string]interface{}
if err := json.Unmarshal(eventJSON, &genericEvent); err != nil {
logger.Error("Failed to unmarshal generic event", zap.Error(err))
continue
}

kind, ok := genericEvent["kind"].(string)
if !ok {
fmt.Printf("🔔 Unknown Event (no kind field)\n")
continue
Expand Down Expand Up @@ -827,18 +883,28 @@ var submitStreamingTaskCmd = &cobra.Command{

default:
fmt.Printf("🔔 Unknown Event Type: %s\n", kind)
if showRaw {
eventJSON, err := json.MarshalIndent(event, "", " ")
if err == nil {
fmt.Printf("%s\n", eventJSON)
}
}
}
fmt.Printf("\n")
}
}

fmt.Printf("✅ Streaming completed!\n")
duration := time.Since(startTime)

fmt.Printf("✅ Streaming completed!\n\n")
fmt.Printf("📋 Streaming Summary:\n")
fmt.Printf(" Task ID: %s\n", streamingSummary.TaskID)
fmt.Printf(" Context ID: %s\n", streamingSummary.ContextID)
fmt.Printf(" Final Status: %s\n", streamingSummary.FinalStatus)
fmt.Printf(" Duration: %s\n", duration.Round(time.Millisecond))
fmt.Printf(" Total Events: %d\n", streamingSummary.TotalEvents)
fmt.Printf(" Status Updates: %d\n", streamingSummary.StatusUpdates)
fmt.Printf(" Artifact Updates: %d\n", streamingSummary.ArtifactUpdates)

if streamingSummary.FinalMessage != nil {
fmt.Printf(" Final Message Parts: %d\n", len(streamingSummary.FinalMessage.Parts))
}

fmt.Printf("\n")
return nil
},
}
Loading