
# Python WebSocket Frame Streaming — Full Notebook

This notebook provides a **complete setup** to:
- **Record frames** from a webcam into a folder (filenames are UNIX timestamps, e.g. `1747154380.5511632.png`).
- **Stream frames** over **WebSocket** to a backend (e.g. `ws://HOST:8003/ws/`), one message per frame.
- Control payload fields: `datapt_id`, `state` (`"stream"` / `"end"`), `frame_data` (base64, no prefix), `timestamp`, and `advanced`.
- Match the behavior described in the README you provided.

> Tip: You can run only the sections you need. For **recording**, run the "Install & Imports" then "**Record frames**" section.
> For **streaming**, run "Install & Imports", set **configuration**, then "**Run the streaming client**".



## Install & Imports

This uses:
- `websockets` for WebSocket communication
- `opencv-python` (`cv2`) for recording
- `numpy` for image handling

If you prefer **uv**, you can instead run `uv add websockets opencv-python numpy` in your environment.  
Below we use `pip` for convenience in notebook environments.


In [2]:

# If running locally and missing deps, uncomment this cell:
%pip install websockets opencv-python numpy


Collecting opencv-python
  Downloading opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (19 kB)
Collecting numpy
  Using cached numpy-2.2.6-cp310-cp310-win_amd64.whl.metadata (60 kB)
Downloading opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl (39.0 MB)
   ---------------------------------------- 0.0/39.0 MB ? eta -:--:--
   ---------------------------------------- 0.3/39.0 MB ? eta -:--:--
   - -------------------------------------- 1.6/39.0 MB 5.6 MB/s eta 0:00:07
   -- ------------------------------------- 2.9/39.0 MB 5.6 MB/s eta 0:00:07
   ---- ----------------------------------- 4.2/39.0 MB 5.7 MB/s eta 0:00:07
   ------ --------------------------------- 6.3/39.0 MB 6.8 MB/s eta 0:00:05
   -------- ------------------------------- 8.1/39.0 MB 7.1 MB/s eta 0:00:05
   ---------- ----------------------------- 10.2/39.0 MB 7.5 MB/s eta 0:00:04
   ------------ --------------------------- 12.1/39.0 MB 7.6 MB/s eta 0:00:04
   -------------- ------------------------- 14.4/39.0 MB 8

In [3]:

import os
import io
import cv2
import uuid
import time
import json
import base64
import asyncio
import logging
import pathlib
import traceback
from typing import Optional, List, Tuple

import numpy as np
try:
    import websockets
    from websockets.client import connect
except Exception as e:
    raise RuntimeError("websockets is required. Install with `%pip install websockets`.") from e

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    datefmt="%H:%M:%S",
)
log = logging.getLogger("ws-client")


  from websockets.client import connect



## Configuration

You can set these here, or leave them as `os.getenv(...)` to pull from environment variables.

- `BACKEND_WS_BASE` — e.g. `ws://localhost:8003/ws/`
- `API_KEY` — your API key string (if required by your backend)
- `IMAGES_DIR` — folder containing timestamped `.png` images
- `FPS` — frame send rate
- `FRAME_FORMAT` — `"jpeg"` (default) or `"raw"`
- `ADVANCED` — whether to request advanced signals in responses
- Optional webcam capture parameters for the recorder


In [9]:

# --- Core streaming config ---
BACKEND_WS_BASE = os.getenv("BACKEND_WS_BASE", "ws://localhost:8003/ws/")
API_KEY          = os.getenv("API_KEY", "YOUR_API_KEY")
IMAGES_DIR       = os.getenv("IMAGES_DIR", "images")
FPS              = float(os.getenv("FPS", "30"))
FRAME_FORMAT     = os.getenv("FRAME_FORMAT", "jpeg")  # "jpeg" or "raw"
ADVANCED         = os.getenv("ADVANCED", "1") not in ("0", "false", "False")

# --- Recorder config (used if you record frames here) ---
CAMERA_INDEX   = int(os.getenv("CAMERA_INDEX", "0"))
RES_WIDTH      = int(os.getenv("RES_WIDTH", "640"))
RES_HEIGHT     = int(os.getenv("RES_HEIGHT", "480"))
DURATION_SEC   = float(os.getenv("DURATION_SEC", "10"))
SHOW_PREVIEW   = os.getenv("SHOW_PREVIEW", "1") not in ("0", "false", "False")

# --- Retry / connection tuning ---
CONNECT_TIMEOUT_SEC = float(os.getenv("CONNECT_TIMEOUT_SEC", "10"))
SEND_QUEUE_MAXSIZE  = int(os.getenv("SEND_QUEUE_MAXSIZE", "256"))
PREFILL_QUEUE       = int(os.getenv("PREFILL_QUEUE", "10"))  # frames to pre-encode before sending starts

print("Config loaded:")
print(" BACKEND_WS_BASE =", BACKEND_WS_BASE)
print(" API_KEY         =", "(set)" if API_KEY else "(empty)")
print(" IMAGES_DIR      =", IMAGES_DIR)
print(" FPS             =", FPS)
print(" FRAME_FORMAT    =", FRAME_FORMAT)
print(" ADVANCED        =", ADVANCED)


Config loaded:
 BACKEND_WS_BASE = ws://localhost:8003/ws/
 API_KEY         = (set)
 IMAGES_DIR      = images
 FPS             = 30.0
 FRAME_FORMAT    = jpeg
 ADVANCED        = True



## Helpers — URL and payload constructors


In [10]:

from urllib.parse import urlparse, urlunparse, urlencode, parse_qsl

def build_ws_url(base: str, api_key: Optional[str]) -> str:
    """Attach the API key as a query parameter `api_key` if provided."""
    parts = urlparse(base)
    query = dict(parse_qsl(parts.query))
    if api_key:
        query["api_key"] = api_key
    new_query = urlencode(query)
    new_parts = parts._replace(query=new_query)
    url = urlunparse(new_parts)
    return url

def build_payload(datapt_id: str, frame_b64: str, timestamp_str: str, *, state: str, advanced: bool) -> dict:
    return {
        "datapt_id": datapt_id,
        "state": state,  # "stream" | "end"
        "frame_data": frame_b64,  # base64 string (no prefix)
        "timestamp": timestamp_str,  # string
        "advanced": bool(advanced),
    }



## Record frames (optional)

This replicates `record.py`: captures frames to `IMAGES_DIR` as `.png` files named by UNIX timestamps (with fractional seconds).  
Run this if you need sample frames.


In [11]:

def ensure_dir(path: str):
    pathlib.Path(path).mkdir(parents=True, exist_ok=True)

def record_frames(
    images_dir: str = IMAGES_DIR,
    fps: float = FPS,
    duration_sec: float = DURATION_SEC,
    camera_index: int = CAMERA_INDEX,
    width: int = RES_WIDTH,
    height: int = RES_HEIGHT,
    show_preview: bool = SHOW_PREVIEW,
) -> List[str]:
    ensure_dir(images_dir)
    cap = cv2.VideoCapture(camera_index)
    if not cap.isOpened():
        raise RuntimeError(f"Could not open camera index {camera_index}")
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    cap.set(cv2.CAP_PROP_FPS, fps)

    written = []
    frame_interval = 1.0 / fps
    end_time = time.time() + duration_sec
    try:
        while time.time() < end_time:
            ts = time.time()
            ok, frame = cap.read()
            if not ok:
                log.warning("Failed to read frame from camera.")
                continue

            if show_preview:
                cv2.imshow("Preview (press q to quit)", frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            # Save as PNG with timestamp filename
            filename = f"{ts:.10f}.png"  # keep precision
            path = str(pathlib.Path(images_dir) / filename)
            ok = cv2.imwrite(path, frame)
            if ok:
                written.append(path)
            else:
                log.warning("Failed to write frame to %s", path)

            # Pace
            delay = frame_interval - (time.time() - ts)
            if delay > 0:
                time.sleep(delay)
    finally:
        cap.release()
        try:
            cv2.destroyAllWindows()
        except Exception:
            pass
    log.info("Wrote %d frames to %s", len(written), images_dir)
    return written


In [None]:

# Example usage (optional):
# recorded = record_frames()
# len(recorded), recorded[:3]


In [12]:
recorded = record_frames(
    images_dir=IMAGES_DIR,
    fps=FPS,             # 30.0 na sua config
    duration_sec=10,     # ajuste se quiser mais/menos tempo
    camera_index=0,      # troque para 1/2 se tiver mais câmeras
    width=640,
    height=480,
    show_preview=True    # janela do OpenCV; pressione 'q' para parar
)
len(recorded), recorded[:3]


20:37:37 | INFO | ws-client | Wrote 195 frames to images


(195,
 ['images\\1762457846.4537277222.png',
  'images\\1762457847.7506310940.png',
  'images\\1762457847.7850155830.png'])


## Encoder producer

Reads `.png` files from `IMAGES_DIR` (sorted by timestamp filename), encodes each frame to base64.

- If `FRAME_FORMAT == "jpeg"`, converts PNG → JPEG in-memory for compact payloads.
- If `FRAME_FORMAT == "raw"`, sends the original PNG bytes (still base64, but raw PNG).


In [15]:

def list_frame_paths(images_dir: str) -> List[pathlib.Path]:
    p = pathlib.Path(images_dir)
    if not p.exists():
        return []
    files = [f for f in p.iterdir() if f.suffix.lower() == ".png"]
    # Sort by numeric timestamp parsed from filename (fallback to name)
    def ts_key(f: pathlib.Path):
        try:
            return float(f.stem)
        except Exception:
            return f.stem
    return sorted(files, key=ts_key)

def encode_frame_to_base64(img_bgr: np.ndarray, *, to_jpeg: bool) -> str:
    if to_jpeg:
        ok, buf = cv2.imencode(".jpg", img_bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
    else:
        ok, buf = cv2.imencode(".png", img_bgr, [])
    if not ok:
        raise RuntimeError("Failed to encode image.")
    return base64.b64encode(buf.tobytes()).decode("utf-8")

async def encoder_producer(
    images_dir: str,
    queue: asyncio.Queue,
    prefill: int = 10,
    frame_format: str = "jpeg"
):
    paths = list_frame_paths(images_dir)
    if not paths:
        log.warning("No .png frames found in %s", images_dir)
        return

    # Prefill queue (synchronously)
    to_jpeg = (frame_format.lower() == "jpeg")
    for i, path in enumerate(paths):
        # Read PNG
        img = cv2.imread(str(path), cv2.IMREAD_COLOR)
        if img is None:
            log.warning("Failed to read image: %s", path)
            continue

        # Build timestamp string from filename stem
        timestamp_str = path.stem
        frame_b64 = encode_frame_to_base64(img, to_jpeg=to_jpeg)

        await queue.put((frame_b64, timestamp_str, str(path)))
        if queue.qsize() >= prefill:
            break

    # Put remaining frames
    for path in paths[queue.qsize():]:
        img = cv2.imread(str(path), cv2.IMREAD_COLOR)
        if img is None:
            log.warning("Failed to read image: %s", path)
            continue
        timestamp_str = path.stem
        frame_b64 = encode_frame_to_base64(img, to_jpeg=to_jpeg)
        await queue.put((frame_b64, timestamp_str, str(path)))

    # Signal completion
    await queue.put(None)



## WebSocket sender and server listener

- `server_listener` prints every JSON message from the server (handles text frames; binary frames are shown as length only).
- `send_stream` sends one message per frame at the configured `FPS`, and emits a final `"end"` message containing the last frame.


In [16]:

async def server_listener(ws: websockets.WebSocketClientProtocol):
    try:
        async for msg in ws:
            if isinstance(msg, (bytes, bytearray)):
                log.info("<< (binary) %d bytes", len(msg))
                continue
            try:
                payload = json.loads(msg)
            except Exception:
                log.info("<< %s", msg[:500])
            else:
                log.info("<< %s", json.dumps(payload, ensure_ascii=False))
    except asyncio.CancelledError:
        pass
    except Exception as e:
        log.error("Listener error: %s", e)
        log.debug(traceback.format_exc())


  async def server_listener(ws: websockets.WebSocketClientProtocol):


In [17]:

async def send_stream(
    ws: websockets.WebSocketClientProtocol,
    queue: asyncio.Queue,
    datapt_id: str,
    fps: float,
    advanced: bool
):
    frame_interval = 1.0 / fps
    last = None
    t0 = time.time()
    index = 0

    while True:
        item = await queue.get()
        if item is None:
            break
        frame_b64, timestamp_str, _path = item
        last = (frame_b64, timestamp_str)

        # Build "stream" payload
        payload = build_payload(
            datapt_id=datapt_id,
            frame_b64=frame_b64,
            timestamp_str=timestamp_str,
            state="stream",
            advanced=advanced,
        )
        await ws.send(json.dumps(payload))
        log.info(">> stream frame %d @ %s", index, timestamp_str)

        # Pace to FPS
        index += 1
        elapsed = time.time() - t0
        target = index * frame_interval
        delay = target - elapsed
        if delay > 0:
            await asyncio.sleep(delay)

    # Send final "end" payload with last frame
    if last is not None:
        frame_b64, timestamp_str = last
        end_payload = build_payload(
            datapt_id=datapt_id,
            frame_b64=frame_b64,
            timestamp_str=timestamp_str,
            state="end",
            advanced=advanced,
        )
        await ws.send(json.dumps(end_payload))
        log.info(">> end @ %s", timestamp_str)


  ws: websockets.WebSocketClientProtocol,



## Main runner

Creates the queue, launches the encoder producer and server listener, and streams frames.


In [18]:

async def run_client(
    backend_ws_base: str = BACKEND_WS_BASE,
    api_key: Optional[str] = API_KEY,
    images_dir: str = IMAGES_DIR,
    fps: float = FPS,
    advanced: bool = ADVANCED,
    frame_format: str = FRAME_FORMAT,
):
    # Prepare queue and encode frames
    queue: asyncio.Queue = asyncio.Queue(maxsize=SEND_QUEUE_MAXSIZE)
    encoder = asyncio.create_task(encoder_producer(images_dir, queue, prefill=PREFILL_QUEUE, frame_format=frame_format))

    # Compose URL
    ws_url = build_ws_url(backend_ws_base, api_key)
    log.info("Connecting to %s", ws_url)

    # Connect and run
    async with connect(ws_url, open_timeout=CONNECT_TIMEOUT_SEC) as ws:
        datapt_id = str(uuid.uuid4())
        listener = asyncio.create_task(server_listener(ws))
        try:
            await send_stream(ws, queue, datapt_id=datapt_id, fps=fps, advanced=advanced)
        finally:
            listener.cancel()
            with contextlib.suppress(Exception):
                await listener


In [21]:

import contextlib

# Convenience wrapper to run from a notebook cell
import asyncio

def run_event_loop(coro):
    """
    Safe wrapper to run an async coroutine in notebooks (works even if a loop is already running).
    """
    try:
        return asyncio.run(coro)
    except RuntimeError:
        import nest_asyncio
        nest_asyncio.apply()
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(coro)




## (Optional) Generate dummy frames

If `IMAGES_DIR` is empty, use this cell to generate a few simple PNG frames with timestamped filenames.


In [None]:

def generate_dummy_frames(images_dir: str = IMAGES_DIR, count: int = 30, width: int = RES_WIDTH, height: int = RES_HEIGHT):
    ensure_dir(images_dir)
    for i in range(count):
        ts = time.time()
        img = np.zeros((height, width, 3), dtype=np.uint8)
        # Draw a moving rectangle
        x = int((i / max(1, count - 1)) * (width - 50))
        cv2.rectangle(img, (x, 50), (x + 50, 100), (0, 255, 0), -1)
        cv2.putText(img, f"{ts:.6f}", (10, height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv2.LINE_AA)
        filename = f"{ts:.10f}.png"
        path = str(pathlib.Path(images_dir) / filename)
        cv2.imwrite(path, img)
        time.sleep(0.02)
    log.info("Dummy frames generated in %s", images_dir)

# Example (optional):
# generate_dummy_frames(count=60)



## Run the streaming client

Ensure your backend is reachable (e.g. `ws://localhost:8003/ws/`).  
If you need to pass the API key through headers instead of querystring, adapt the `connect(...)` call to include `extra_headers`.


In [28]:

# Run the client (make sure IMAGES_DIR has .png frames, e.g. after recording or generating dummy frames)
# If you need advanced data disabled, set ADVANCED = False above.
run_event_loop(run_client())


21:03:51 | INFO | ws-client | Connecting to ws://localhost:8003/ws/?api_key=YOUR_API_KEY
21:03:54 | INFO | websockets.server | connection open
21:03:54 | ERROR | websockets.server | connection handler failed
Traceback (most recent call last):
  File "c:\Users\giuli\Downloads\python_demo-20251106T184058Z-1-001\python_demo\.venv\lib\site-packages\websockets\asyncio\server.py", line 376, in conn_handler
    await self.handler(connection)
TypeError: start_test_ws_server.<locals>.handler() missing 1 required positional argument: 'path'
21:03:54 | INFO | ws-client | >> stream frame 0 @ 1762457846.4537277222
21:03:54 | ERROR | ws-client | Listener error: received 1011 (internal error); then sent 1011 (internal error)


ConnectionClosedError: received 1011 (internal error); then sent 1011 (internal error)


## (Optional) One-click: record then stream

This cell records for `DURATION_SEC` at `FPS` and immediately streams the results.


In [None]:

# record_frames(images_dir=IMAGES_DIR, fps=FPS, duration_sec=DURATION_SEC, camera_index=CAMERA_INDEX,
#               width=RES_WIDTH, height=RES_HEIGHT, show_preview=SHOW_PREVIEW)
# run_event_loop(run_client())



## Notes & Troubleshooting

- **API key passing**: By default this notebook appends `?api_key=YOUR_API_KEY` to the WS URL.
  If your server expects a header (e.g. `Authorization: Bearer ...`), replace the `connect(...)` call with:
  ```python
  async with connect(ws_url, open_timeout=CONNECT_TIMEOUT_SEC, extra_headers={"Authorization": f"Bearer {API_KEY}"}) as ws:
      ...
  ```
- **`FRAME_FORMAT`**:
  - `"jpeg"`: The client converts frames to JPEG and base64-encodes (smaller payload).
  - `"raw"`: The client keeps frames as PNG before base64-encoding.
- **Datapoint ID**: A single `uuid4` is generated per session and used for all frames.
- **States**:
  - `"stream"` for regular frames
  - `"end"` for the final message (server can use this to finalize inference).
- **Windows camera**: If your camera can’t open at specified resolution, try removing explicit `CAP_PROP_*` sets.
- **Backpressure**: The producer encodes frames into an asyncio queue. `PREFILL_QUEUE` ensures we start with a buffer.
- **Performance**: For large images or high FPS, consider a dedicated encoder pool and binary WebSocket frames.


In [26]:
import asyncio, threading, json, websockets

def start_test_ws_server(host="127.0.0.1", port=8003):
    async def handler(ws):
        # Em versões recentes, o 'path' vem como atributo do objeto
        path = getattr(ws, "path", "/")
        if path not in ("/ws", "/ws/"):
            await ws.close(code=1008, reason="Invalid path")
            return

        async for msg in ws:
            try:
                payload = json.loads(msg)
            except Exception:
                payload = {"raw": msg}

            # resposta intermediária "ok"
            await ws.send(json.dumps({
                "state": "ok",
                "socket_id": "localtest",
                "datapt_id": payload.get("datapt_id", "test"),
                "inference": {"hr": 66},
                "advanced": {"rppg": [0.1, 0.2], "rppg_timestamps": [1.0, 1.03]},
                "confidence": {},
                "feedback": None,
                "model_version": "HR"
            }))

            # ao receber "end", envia "finished"
            if payload.get("state") == "end":
                await ws.send(json.dumps({
                    "state": "finished",
                    "socket_id": "localtest",
                    "datapt_id": payload.get("datapt_id", "test"),
                    "inference": {"hr": 65},
                    "advanced": None,
                    "confidence": {},
                    "feedback": None,
                    "model_version": "HR"
                }))

    async def main():
        async with websockets.serve(handler, host, port):
            await asyncio.Future()  # roda para sempre

    t = threading.Thread(target=lambda: asyncio.run(main()), daemon=True)
    t.start()
    return t

_ = start_test_ws_server()
print("Servidor WS de teste ativo em ws://127.0.0.1:8003/ws/")


Servidor WS de teste ativo em ws://127.0.0.1:8003/ws/


Exception in thread Thread-7 (<lambda>):
Traceback (most recent call last):
  File "C:\Users\giuli\AppData\Local\Programs\Python\Python310\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\Users\giuli\AppData\Local\Programs\Python\Python310\lib\threading.py", line 953, in run


    self._target(*self._args, **self._kwargs)
  File "C:\Users\giuli\AppData\Local\Temp\ipykernel_10900\613836518.py", line 46, in <lambda>
  File "c:\Users\giuli\Downloads\python_demo-20251106T184058Z-1-001\python_demo\.venv\lib\site-packages\nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
  File "c:\Users\giuli\Downloads\python_demo-20251106T184058Z-1-001\python_demo\.venv\lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
    return f.result()
  File "C:\Users\giuli\AppData\Local\Programs\Python\Python310\lib\asyncio\futures.py", line 201, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "C:\Users\giuli\AppData\Local\Programs\Python\Python310\lib\asyncio\tasks.py", line 232, in __step
    result = coro.send(None)
  File "C:\Users\giuli\AppData\Local\Temp\ipykernel_10900\613836518.py", line 43, in main
  File "c:\Users\giuli\Downloads\python_demo-20251106T184058Z-1-001\python_demo\.venv\lib\site-packages\websocke

In [27]:
# (mantenha a URL)
BACKEND_WS_BASE = "ws://127.0.0.1:8003/ws/"
run_event_loop(run_client())


20:55:01 | INFO | ws-client | Connecting to ws://localhost:8003/ws/?api_key=YOUR_API_KEY
20:55:04 | INFO | websockets.server | connection open
20:55:04 | ERROR | websockets.server | connection handler failed
Traceback (most recent call last):
  File "c:\Users\giuli\Downloads\python_demo-20251106T184058Z-1-001\python_demo\.venv\lib\site-packages\websockets\asyncio\server.py", line 376, in conn_handler
    await self.handler(connection)
TypeError: start_test_ws_server.<locals>.handler() missing 1 required positional argument: 'path'
20:55:04 | INFO | ws-client | >> stream frame 0 @ 1762457846.4537277222
20:55:04 | ERROR | ws-client | Listener error: received 1011 (internal error); then sent 1011 (internal error)
  m = tuple(map(os.fspath, m))


ConnectionClosedError: received 1011 (internal error); then sent 1011 (internal error)