Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions agents-core/vision_agents/core/agents/agent_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,13 @@ async def warmup(self) -> None:

# Create a dry-run Agent instance and warmup its components for the first time.
agent: "Agent" = await await_or_run(self._create_agent)
logger.info("Warming up agent components...")
await self._warmup_agent(agent)
self._warmed_up = True

logger.info("Agent warmup completed")
try:
logger.info("Warming up agent components...")
await self._warmup_agent(agent)
self._warmed_up = True
logger.info("Agent warmup completed")
finally:
await agent.close()

@property
def warmed_up(self) -> bool:
Expand Down
1 change: 1 addition & 0 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ async def _close(self):
self._call_ended_event = None
self._joined_at = 0.0
self.clear_call_logging_context()
self.events.stop()
self._closed = True
self.logger.info("🤖 Agent stopped")

Expand Down
24 changes: 9 additions & 15 deletions agents-core/vision_agents/core/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
from .base import (
ConnectionState,
AudioFormat,
BaseEvent,
PluginBaseEvent,
PluginInitializedEvent,
PluginClosedEvent,
PluginErrorEvent,
VideoProcessorDetectionEvent,
)
from .manager import EventManager

from getstream.models import (
BlockedUserEvent,
CallAcceptedEvent,
Expand Down Expand Up @@ -65,6 +53,15 @@
UpdatedCallPermissionsEvent,
)

from .base import (
AudioFormat,
BaseEvent,
ConnectionState,
PluginBaseEvent,
VideoProcessorDetectionEvent,
)
from .manager import EventManager

__all__ = [
"BlockedUserEvent",
"CallAcceptedEvent",
Expand Down Expand Up @@ -125,9 +122,6 @@
"AudioFormat",
"BaseEvent",
"PluginBaseEvent",
"PluginInitializedEvent",
"PluginClosedEvent",
"PluginErrorEvent",
"VideoProcessorDetectionEvent",
"EventManager",
]
84 changes: 3 additions & 81 deletions agents-core/vision_agents/core/events/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import uuid
import dataclasses
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional
from types import FunctionType
from dataclasses_json import DataClassJsonMixin
from typing import Any, Optional

from dataclasses_json import DataClassJsonMixin
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant


Expand Down Expand Up @@ -54,91 +54,13 @@ class PluginBaseEvent(BaseEvent):
plugin_version: str | None = None


@dataclass
class PluginInitializedEvent(PluginBaseEvent):
"""Event emitted when a plugin is successfully initialized."""

type: str = field(default="plugin.initialized", init=False)
plugin_type: Optional[str] = None
provider: Optional[str] = None
configuration: Optional[Dict[str, Any]] = None
capabilities: Optional[List[str]] = None


@dataclass
class PluginClosedEvent(PluginBaseEvent):
"""Event emitted when a plugin is closed."""

type: str = field(default="plugin.closed", init=False)
plugin_type: Optional[str] = None # "STT", "STS", "VAD"
provider: Optional[str] = None
reason: Optional[str] = None
cleanup_successful: bool = True


@dataclass
class PluginErrorEvent(PluginBaseEvent):
"""Event emitted when a generic plugin error occurs."""

type: str = field(default="plugin.error", init=False)
plugin_type: Optional[str] = None # "STT", "TTS", "STS", "VAD"
provider: Optional[str] = None
error: Optional[Exception] = None
error_code: Optional[str] = None
context: Optional[str] = None
is_fatal: bool = False

@property
def error_message(self) -> str:
return str(self.error) if self.error else "Unknown error"


@dataclasses.dataclass
class ExceptionEvent:
exc: Exception
handler: FunctionType
type: str = "base.exception"


@dataclasses.dataclass
class HealthCheckEvent(DataClassJsonMixin):
connection_id: str
created_at: int
custom: dict
type: str = "health.check"


@dataclass
class ConnectionOkEvent(BaseEvent):
"""Event emitted when WebSocket connection is established."""

type: str = field(default="connection.ok", init=False)
connection_id: Optional[str] = None
server_time: Optional[str] = None
api_key: Optional[str] = None
user_id: Optional[str] = None # type: ignore[assignment]


@dataclass
class ConnectionErrorEvent(BaseEvent):
"""Event emitted when WebSocket connection encounters an error."""

type: str = field(default="connection.error", init=False)
error_code: Optional[str] = None
error_message: Optional[str] = None
reconnect_attempt: Optional[int] = None


@dataclass
class ConnectionClosedEvent(BaseEvent):
"""Event emitted when WebSocket connection is closed."""

type: str = field(default="connection.closed", init=False)
code: Optional[int] = None
reason: Optional[str] = None
was_clean: bool = False


@dataclass
class VideoProcessorDetectionEvent(PluginBaseEvent):
"""Base event for video processor detection results.
Expand Down
23 changes: 8 additions & 15 deletions agents-core/vision_agents/core/events/manager.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
import asyncio
import uuid
import collections
import logging
import types
import typing
import uuid
from typing import Any, Deque, Dict, Optional, Union, get_args, get_origin

from .base import (
ConnectionClosedEvent,
ConnectionErrorEvent,
ConnectionOkEvent,
ExceptionEvent,
HealthCheckEvent,
)

from .base import ExceptionEvent

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -145,10 +138,6 @@ def __init__(self, ignore_unknown_events: bool = True):
self._received_event = asyncio.Event()

self.register(ExceptionEvent)
self.register(HealthCheckEvent)
self.register(ConnectionOkEvent)
self.register(ConnectionErrorEvent)
self.register(ConnectionClosedEvent)

# Start background processing task
self._start_processing_task()
Expand Down Expand Up @@ -195,8 +184,7 @@ def register(self, event_class, ignore_not_compatible=False):

def merge(self, em: "EventManager"):
# Stop the processing task in the merged manager
if em._processing_task and not em._processing_task.done():
em._processing_task.cancel()
em.stop()

# Merge all data from the other manager
self._events.update(em._events)
Expand Down Expand Up @@ -559,3 +547,8 @@ async def _process_single_event(self, event):
loop = asyncio.get_running_loop()
handler_task = loop.create_task(self._run_handler(handler, event))
self._handler_tasks[uuid.uuid4()] = handler_task

def stop(self):
if self._processing_task and not self._processing_task.done():
self._processing_task.cancel()
self._processing_task = None
59 changes: 37 additions & 22 deletions agents-core/vision_agents/core/runner/runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import warnings
from typing import Optional
from uuid import uuid4
Expand All @@ -26,10 +27,6 @@

logger = logging.getLogger(__name__)

# TODO:
# - Figure out how to serialize the agent config into some dict
# - Docs

asyncio_logger = logging.getLogger("asyncio")


Expand Down Expand Up @@ -127,12 +124,15 @@ async def _run():
# Start the agent launcher.
await self._launcher.start()

# Create the agent
agent = await self._launcher.launch()

logger.info("✅ Agent warmed up and ready")

# Join call if join_call function is provided
logger.info(f"📞 Joining call: {call_type}/{call_id}")
session = await self._launcher.start_session(
call_id, call_type, video_track_override_path=video_track_override
)
# Open demo UI by default
agent = session.agent
if (
not no_demo
and hasattr(agent, "edge")
Expand All @@ -141,11 +141,6 @@ async def _run():
logger.info("🌐 Opening demo UI...")
await agent.edge.open_demo_for_agent(agent, call_type, call_id)

# Join call if join_call function is provided
logger.info(f"📞 Joining call: {call_type}/{call_id}")
session = await self._launcher.start_session(
call_id, call_type, video_track_override_path=video_track_override
)
await session.wait()
except asyncio.CancelledError:
logger.info("The session is cancelled, shutting down gracefully...")
Expand Down Expand Up @@ -177,18 +172,17 @@ def serve(
port: int = 8000,
agents_log_level: str = "INFO",
http_log_level: str = "INFO",
):
debug: bool = False,
) -> None:
"""
Start the HTTP server that spawns agents to the calls.

Args:
host:
port:
agents_log_level:
http_log_level:

Returns:

host: Host address to bind the server to.
port: Port number for the server.
agents_log_level: Logging level for agent-related logs.
http_log_level: Logging level for FastAPI and uvicorn logs.
debug: Enable asyncio debug mode.
"""
# Configure loggers if they're not already configured
configure_sdk_logger(
Expand All @@ -203,9 +197,22 @@ def serve(
warnings.filterwarnings(
"ignore", category=RuntimeWarning, module="dataclasses_json.core"
)

# Enable asyncio debug via environment variable before uvicorn creates its loop
if debug:
os.environ.setdefault("PYTHONASYNCIODEBUG", "1")
uvicorn.run(self.fast_api, host=host, port=port, log_config=None)

def _create_fastapi_app(self, options: ServeOptions) -> FastAPI:
"""
Create and configure a FastAPI application for serving agents.

Args:
options: Configuration options for the server.

Returns:
Configured FastAPI application instance.
"""
app = FastAPI(lifespan=lifespan)
app.state.launcher = self._launcher
app.state.options = self._serve_options
Expand All @@ -228,9 +235,9 @@ def _create_fastapi_app(self, options: ServeOptions) -> FastAPI:
)
return app

def cli(self):
def cli(self) -> None:
"""
Run the CLI
Run the command-line interface with `run` and `serve` subcommands.
"""

@click.group()
Expand Down Expand Up @@ -326,11 +333,18 @@ def run_cmd(
default="INFO",
help="Set the logging level for FastAPI and uvicorn",
)
@click.option(
"--debug",
is_flag=True,
default=False,
help="Enable asyncio debug mode",
)
def serve_cmd(
host: str,
port: int,
agents_log_level: str,
http_log_level: str,
debug: bool,
) -> None:
"""
Start the HTTP server that spawns agents to the calls.
Expand All @@ -340,6 +354,7 @@ def serve_cmd(
port=port,
agents_log_level=agents_log_level.upper(),
http_log_level=http_log_level.upper(),
debug=debug,
)

cli_()
10 changes: 0 additions & 10 deletions agents-core/vision_agents/core/tts/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import av
from vision_agents.core.events import (
AudioFormat,
PluginClosedEvent,
)
from vision_agents.core.events.manager import EventManager

Expand Down Expand Up @@ -282,12 +281,3 @@ async def send(

async def close(self):
"""Close the TTS service and release any resources."""
self.events.send(
PluginClosedEvent(
session_id=self.session_id,
plugin_name=self.provider_name,
plugin_type="TTS",
provider=self.provider_name,
cleanup_successful=True,
)
)
Loading