Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ This plugin enables bidirectional audio and video streaming between Pipecat pipe
- **Participant lifecycle events** — react to participants joining, leaving, and subscribing/unsubscribing tracks
- **Interruption handling** — immediate audio buffer flushing when a user interrupts the agent
- **REST helper** — manage users, calls, and authentication tokens via the GetStream API
- **Custom events and messaging** — send and receive arbitrary data between the agent and participants
- **Custom events** — send and receive structured JSON events between the agent and call participants (up to 5KB per event)
- **Clock-based audio pacing** — drift-free real-time audio output

## Installation
Expand Down Expand Up @@ -116,6 +116,24 @@ token = helper.create_token(user_id="demo-user", expiration=3600)
await helper.delete_call(call_type="default", call_id="my-call")
```

## Custom Events

Send structured events to everyone watching the call:

```python
await transport.send_custom_event({"type": "agent_state", "state": "thinking"})
```

Receive events from participants by registering `on_stream_custom_event`:

```python
@transport.event_handler("on_stream_custom_event")
async def on_stream_custom_event(transport, event):
print(f"Got custom event: {event}")
```

Payloads are limited to 5KB. Events are delivered only to clients that are currently watching the call.

## Event Handlers

Register event handlers on the transport to respond to call lifecycle events:
Expand Down Expand Up @@ -146,6 +164,7 @@ async def on_first_participant_joined(transport, participant_id):
| `on_video_track_subscribed` | `participant_id` | Video from a participant is now being received |
| `on_video_track_unsubscribed` | `participant_id` | Video from a participant is no longer being received |
| `on_data_received` | `data`, `participant_id` | Custom data/event received from a participant |
| `on_stream_custom_event` | `event` | Custom event from a client watching the call |

## Running the Example

Expand Down
9 changes: 8 additions & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# ]
#
# [tool.uv.sources]
# pipecat-getstream = { path = "." }
# pipecat-getstream = { path = ".", editable = true }
# ///

import asyncio
Expand Down Expand Up @@ -126,6 +126,13 @@ async def main():
),
)

@transport.event_handler("on_stream_custom_event")
def on_stream_custom_event(*args, **kwargs):
"""
A callback to react on Custom events sent to the call in Stream
"""
...

@transport.event_handler("on_connected")
async def on_connected(*_):
"""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ classifiers = [
]

requires-python = ">=3.10"
dependencies = ["getstream[webrtc,telemetry]>=3.0.4,<4", "pipecat-ai>=0.0.108"]
dependencies = ["getstream[webrtc,telemetry]>=3.3.0,<4", "pipecat-ai>=0.0.108"]

[project.urls]
Source = "https://github.com/GetStream/pipecat-getstream"
Expand Down
83 changes: 22 additions & 61 deletions src/pipecat_getstream/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"""

import asyncio
import json
import time
from dataclasses import dataclass
from fractions import Fraction
Expand Down Expand Up @@ -44,6 +43,8 @@
}

try:
import warnings

import av
from aiortc import MediaStreamTrack
from getstream import AsyncStream
Expand All @@ -54,7 +55,6 @@
from getstream.video.rtc.connection_manager import ConnectionManager
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import TrackType
from getstream.video.rtc.tracks import SubscriptionConfig, TrackSubscriptionConfig
import warnings

# Suppress dataclasses_json missing value RuntimeWarnings.
# They pollute the output and cannot be fixed by the users.
Expand Down Expand Up @@ -128,6 +128,7 @@ class GetstreamCallbacks(BaseModel):
on_video_track_unsubscribed: Callable[[str], Coroutine[None, None, None]]
on_data_received: Callable[[bytes, str], Coroutine[None, None, None]]
on_first_participant_joined: Callable[[str], Coroutine[None, None, None]]
on_custom_event: Callable[[dict], Coroutine[None, None, None]]


class PipecatVideoStreamTrack(MediaStreamTrack):
Expand Down Expand Up @@ -380,6 +381,7 @@ async def connect(self):
self._connection.on("track_published")(self._on_track_published)
self._connection.on("track_unpublished")(self._on_track_unpublished)
self._connection.on("call_ended")(self._on_call_ended)
self._connection.on("custom")(self._callbacks.on_custom_event)

# Establish the WebRTC connection
await self._connection.__aenter__()
Expand Down Expand Up @@ -468,21 +470,22 @@ async def disconnect(self):

await self._callbacks.on_disconnected()

async def send_data(self, data: bytes, participant_id: Optional[str] = None):
"""Send custom event data to participants in the call.
async def send_custom_event(self, data: dict):
"""Send a custom event to call participants.

Custom events are only delivered to clients that are watching the call.
The total payload for these events is limited to 5KB in size.

Args:
data: The data bytes to send.
participant_id: Optional specific participant to target.
data: Dictionary of custom event data to send.
"""
if not self._connected or not self._call:
return

try:
custom_data = json.loads(data.decode()) if isinstance(data, bytes) else data
await self._call.send_call_event(user_id=self._user_id, custom=custom_data)
await self._call.send_call_event(user_id=self._user_id, custom=data)
except Exception:
logger.exception("Error sending data")
logger.exception("Error sending custom event")

def get_participants(self) -> List[str]:
"""Get list of participant IDs in the call.
Expand Down Expand Up @@ -1129,7 +1132,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
if isinstance(frame, InterruptionFrame) and self._allow_interruptions:
if isinstance(frame, InterruptionFrame):
await self._client.flush_audio()
self._audio_clock = 0.0
self._audio_clock_total = 0.0
Expand Down Expand Up @@ -1159,28 +1162,6 @@ async def cleanup(self):
await super().cleanup()
await self._transport.cleanup()

async def send_message(
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
):
"""Send a transport message to participants.

Args:
frame: The transport message frame to send.
"""
message = frame.message
if isinstance(message, dict):
message = json.dumps(message, ensure_ascii=False)
if isinstance(
frame,
(
GetstreamOutputTransportMessageFrame,
GetstreamOutputTransportMessageUrgentFrame,
),
):
await self._client.send_data(message.encode(), frame.participant_id)
else:
await self._client.send_data(message.encode())

async def write_audio_frame(self, frame: OutputAudioRawFrame) -> bool:
"""Write an audio frame to the Stream Video call.

Expand Down Expand Up @@ -1292,6 +1273,7 @@ def __init__(
on_video_track_unsubscribed=self._on_video_track_unsubscribed,
on_data_received=self._on_data_received,
on_first_participant_joined=self._on_first_participant_joined,
on_custom_event=self._on_custom_event,
)
self._params = params or GetstreamParams()

Expand Down Expand Up @@ -1320,6 +1302,7 @@ def __init__(
self._register_event_handler("on_first_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_before_disconnect", sync=True)
self._register_event_handler("on_stream_custom_event")

def input(self) -> GetstreamInputTransport:
"""Get the input transport for receiving media and events.
Expand Down Expand Up @@ -1374,10 +1357,13 @@ def get_participants(self) -> List[str]:
async def send_custom_event(self, data: dict):
"""Send a custom event to call participants.

Custom events are only delivered to clients that are watching the call.
The total payload for these events is limited to 5KB in size.

Args:
data: Dictionary of custom event data to send.
"""
await self._client.send_data(json.dumps(data).encode())
await self._client.send_custom_event(data)

async def _on_connected(self):
"""Handle call connected events."""
Expand Down Expand Up @@ -1422,34 +1408,9 @@ async def _on_data_received(self, data: bytes, participant_id: str):
await self._input.push_app_message(data.decode(), participant_id)
await self._call_event_handler("on_data_received", data, participant_id)

async def send_message(self, message: str, participant_id: Optional[str] = None):
"""Send a message to participants in the call.

Args:
message: The message string to send.
participant_id: Optional specific participant to send to.
"""
if self._output:
frame = GetstreamOutputTransportMessageFrame(
message=message, participant_id=participant_id
)
await self._output.send_message(frame)

async def send_message_urgent(
self, message: str, participant_id: Optional[str] = None
):
"""Send an urgent message to participants in the call.

Args:
message: The urgent message string to send.
participant_id: Optional specific participant to send to.
"""
if self._output:
frame = GetstreamOutputTransportMessageUrgentFrame(
message=message, participant_id=participant_id
)
await self._output.send_message(frame)

async def _on_first_participant_joined(self, participant_id: str):
"""Handle first participant joined events."""
await self._call_event_handler("on_first_participant_joined", participant_id)

async def _on_custom_event(self, payload: dict):
await self._call_event_handler("on_stream_custom_event", payload)
1 change: 1 addition & 0 deletions src/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def _factory() -> "GetstreamCallbacks":
on_audio_track_unsubscribed=AsyncMock(),
on_video_track_subscribed=AsyncMock(),
on_video_track_unsubscribed=AsyncMock(),
on_custom_event=AsyncMock(),
on_data_received=AsyncMock(),
on_first_participant_joined=AsyncMock(),
)
Expand Down
106 changes: 96 additions & 10 deletions src/tests/test_getstream_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
import pytest
from dotenv import load_dotenv
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import TrackType
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import CancelFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessorSetup
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams

from pipecat_getstream.transport import (
GetstreamParams,
GetstreamTransport,
GetstreamTransportClient,
)
from pipecat_getstream.utils import GetstreamRESTHelper

load_dotenv()


STREAM_API_KEY = os.environ.get("STREAM_API_KEY")
STREAM_API_SECRET = os.environ.get("STREAM_API_SECRET")
STREAM_API_KEY = os.environ.get("STREAM_API_KEY", "")
STREAM_API_SECRET = os.environ.get("STREAM_API_SECRET", "")
GETSTREAM_INTEGRATION_AVAILABLE = bool(STREAM_API_KEY and STREAM_API_SECRET)


Expand Down Expand Up @@ -99,16 +104,15 @@ async def test_full_participant_session(
reason="Requires STREAM_API_KEY and STREAM_API_SECRET env vars and getstream[webrtc]",
)
@pytest.mark.integration
class TestGetstreamBidirectionalMedia:
"""Real integration test using GetstreamTransportClient.
class TestGetstreamIntegration:
async def test_simultaneous_audio_and_video_bidirectional(self, create_callbacks):
"""Real integration test using GetstreamTransportClient.

The bot connects via the actual transport client (connect/disconnect),
publishes audio+video, and a raw SDK participant verifies reception.
The raw participant also sends media back to verify the transport receives it.
"""
The bot connects via the actual transport client (connect/disconnect),
publishes audio+video, and a raw SDK participant verifies reception.
The raw participant also sends media back to verify the transport receives it.

async def test_simultaneous_audio_and_video_bidirectional(self, create_callbacks):
"""GetstreamTransportClient exchanges audio+video with a real participant."""
"""
from getstream import AsyncStream
from getstream.models import UserRequest
from getstream.video import rtc
Expand Down Expand Up @@ -261,3 +265,85 @@ def on_human_track_added(track_id, kind, user):
finally:
await bot_client.disconnect()
assert not bot_client._connected, "Bot should be disconnected"

async def test_custom_events(self):
"""A custom event sent by one transport is received by another via event_handler."""
call_id = f"integration-test-{uuid.uuid4().hex[:8]}"
user_a_id = "user-a"
user_b_id = "user-b"

params = GetstreamParams(
audio_in_enabled=False,
audio_out_enabled=False,
video_in_enabled=False,
video_out_enabled=False,
)

transport_a = GetstreamTransport(
api_key=STREAM_API_KEY,
api_secret=STREAM_API_SECRET,
call_type="default",
call_id=call_id,
user_id=user_a_id,
params=params,
)
transport_b = GetstreamTransport(
api_key=STREAM_API_KEY,
api_secret=STREAM_API_SECRET,
call_type="default",
call_id=call_id,
user_id=user_b_id,
params=params,
)

received_event = asyncio.Event()
received_payload: list = []

@transport_b.event_handler("on_stream_custom_event")
async def on_event(_transport, payload):
received_payload.append(payload)
received_event.set()

b_joined = asyncio.Event()

@transport_a.event_handler("on_first_participant_joined")
async def on_b_joined(_transport, _participant_id):
b_joined.set()

rest = GetstreamRESTHelper(api_key=STREAM_API_KEY, api_secret=STREAM_API_SECRET)
await rest.create_call(
call_type="default", call_id=call_id, created_by_id=user_a_id
)

task_manager = TaskManager()
task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop()))
frame_setup = FrameProcessorSetup(
clock=SystemClock(), task_manager=task_manager
)

inputs = []
for transport in (transport_a, transport_b):
input_t = transport.input()
await input_t.setup(frame_setup)
await input_t.start(StartFrame())
inputs.append(input_t)

async def send_until_received():
while not received_event.is_set():
await transport_a.send_custom_event({"type": "ping", "msg": "hello"})
await asyncio.sleep(3)

try:
await asyncio.wait_for(b_joined.wait(), timeout=15)
sender_task = asyncio.create_task(send_until_received())
try:
await asyncio.wait_for(received_event.wait(), timeout=30)
finally:
sender_task.cancel()

assert "custom" in received_payload[0]
assert received_payload[0]["custom"]["type"] == "ping"
assert received_payload[0]["custom"]["msg"] == "hello"
finally:
for input_t in inputs:
await input_t.cancel(CancelFrame())
Loading