# Athena DRF backend tests: Authentication and Journaling

This notebook exercises the Django REST Framework (DRF) API for:
- Authentication (JWT obtain/verify/refresh)
- Health check (`/api/ping/`)
- Submitting a journaling run (`/api/runs/`) and streaming its SSE events
- Journaling WebSocket (`/ws/journal/<session_id>`)

Set `ATHENA_API_BASE_URL` (e.g., `http://localhost:8000`) and optionally `ATHENA_TEST_USERNAME`/`ATHENA_TEST_PASSWORD`. For WebSocket, you can override `ATHENA_WS_URL`.


In [6]:
import os
import json
import time
import uuid
import base64
import requests
import threading
from typing import Optional
import websockets


API_BASE = os.getenv("ATHENA_API_BASE_URL", "http://localhost:8000").rstrip("/")
WS_BASE = os.getenv("ATHENA_WS_URL")  # e.g., ws://localhost:8000
if not WS_BASE:
    # Derive from HTTP base
    WS_BASE = API_BASE.replace("http://", "ws://").replace("https://", "wss://")

TEST_USERNAME = TEST_PASSWORD = 'athena'

print({
    "API_BASE": API_BASE,
    "WS_BASE": WS_BASE,
    "TEST_USERNAME": bool(TEST_USERNAME),
    "websockets_available": websockets is not None,
})


{'API_BASE': 'http://localhost:8000', 'WS_BASE': 'ws://localhost:8000', 'TEST_USERNAME': True, 'websockets_available': True}


In [7]:
# Helper HTTP client with optional JWT

class APIClient:
    def __init__(self, base_url: str):
        self.base_url = base_url.rstrip("/")
        self.access_token: Optional[str] = None
        self.refresh_token: Optional[str] = None

    def _headers(self):
        headers = {"Content-Type": "application/json"}
        if self.access_token:
            headers["Authorization"] = f"Bearer {self.access_token}"
        return headers

    def get(self, path: str, **kwargs):
        url = f"{self.base_url}{path}"
        return requests.get(url, headers=self._headers(), **kwargs)

    def post(self, path: str, json: Optional[dict] = None, **kwargs):
        url = f"{self.base_url}{path}"
        return requests.post(url, headers=self._headers(), json=json, **kwargs)

client = APIClient(API_BASE)
client


<__main__.APIClient at 0x7ec064dae960>

In [8]:
# Health check
r = client.get("/api/ping/")
print(r.status_code, r.text)
r.json()


200 {"status":"ok"}


{'status': 'ok'}

In [9]:
# JWT obtain (pair)

if TEST_USERNAME and TEST_PASSWORD:
    resp = requests.post(
        f"{API_BASE}/api/token/",
        json={"username": TEST_USERNAME, "password": TEST_PASSWORD},
        headers={"Content-Type": "application/json"},
    )
    print(resp.status_code)
    print(resp.text)
    if resp.ok:
        tokens = resp.json()
        client.access_token = tokens.get("access")
        client.refresh_token = tokens.get("refresh")
        print("Access token set:", bool(client.access_token))
        print("Refresh token set:", bool(client.refresh_token))
else:
    print("Skip JWT obtain: set ATHENA_TEST_USERNAME and ATHENA_TEST_PASSWORD to run.")


200
{"refresh":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoicmVmcmVzaCIsImV4cCI6MTc1NjgzNzg3MywiaWF0IjoxNzU2NzUxNDczLCJqdGkiOiI3MGNmMzcwMzVjNDE0MDJkYTlkZjdmMmM4MjMyMDM2MiIsInVzZXJfaWQiOiIxIn0.SjGrenFgFrBeFpdT8VxH0XD6-un2gVPe4i1eaJJbY6uBhcTwZR6OGGAD-_hIuAQgUp1Qt4ijgt5y6F-FAPoycA","access":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwiZXhwIjoxNzU2NzUxNzczLCJpYXQiOjE3NTY3NTE0NzMsImp0aSI6ImZlZjU3YmExM2Y4YjRhYWFiYWE3NWMzOWNjNDhmODM4IiwidXNlcl9pZCI6IjEifQ.N86xrF8opcfASVuu_O7gGKmvvv17gQXvpDlQDHC7PTZGJ70CROOZTsNaFhvIG9dh_Uh6Zeo7Z5oTJY1PyB7uRw"}
Access token set: True
Refresh token set: True


In [10]:
# JWT verify

if client.access_token:
    resp = requests.post(
        f"{API_BASE}/api/token/verify/",
        json={"token": client.access_token},
        headers={"Content-Type": "application/json"},
    )
    print(resp.status_code, resp.text)
else:
    print("No access token; run obtain step first or set TEST creds.")


200 {}


In [11]:
# JWT refresh

if client.refresh_token:
    resp = requests.post(
        f"{API_BASE}/api/token/refresh/",
        json={"refresh": client.refresh_token},
        headers={"Content-Type": "application/json"},
    )
    print(resp.status_code, resp.text)
    if resp.ok:
        client.access_token = resp.json().get("access")
        print("Refreshed access token set:", bool(client.access_token))
else:
    print("No refresh token; run obtain step first.")


200 {"access":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwiZXhwIjoxNzU2NzUxNzg2LCJpYXQiOjE3NTY3NTE0ODYsImp0aSI6IjIwYTdhYTE2NjQ1MjRjNDFhZTI5ZDljOGI0NDRiODA1IiwidXNlcl9pZCI6IjEifQ.eXsUh1sH21voc0lxxaMaSPIxgn6jEBtBwtXJPrxLUZ_0XeHEOK5RpN4eONQ6tBUGJV4MTc5nX92RbDE5GBj2qg"}
Refreshed access token set: True


In [12]:
# Submit a journaling run via REST and print the returned run_id

payload = {
    "agent_id": "journaling",
    "input": {"text": "Quick test entry from notebook."},
    "options": {"sensitive": True, "stream": False},
}

r = client.post("/api/runs/", json=payload)
print(r.status_code, r.text)
run_id = r.json().get("run_id") if r.ok else None
run_id


200 {"run_id":"1da0a35a-3590-40ea-82f9-eb23c7191866","queued":true}


'1da0a35a-3590-40ea-82f9-eb23c7191866'

In [None]:
# Stream SSE for a run (prints lines for ~15 seconds)

import requests

def stream_sse(run_id: str, seconds: int = 15):
    url = f"{API_BASE}/api/runs/{run_id}/events"
    print("SSE:", url)
    with requests.get(url, stream=True) as resp:
        print("Status:", resp.status_code)
        start = time.time()
        for line in resp.iter_lines():
            if time.time() - start > seconds:
                break
            if line:
                print(line.decode("utf-8"))

if run_id:
    stream_sse(run_id, seconds=15)
else:
    print("No run_id; submit a run first.")


In [13]:
# WebSocket journaling session (requires `websockets`)

async def ws_journaling_session(session_id: Optional[str] = None):
    if websockets is None:
        print("websockets not installed; run `pip install websockets`.\nSkipping.")
        return
    sid = session_id or str(uuid.uuid4())
    url = f"{WS_BASE}/ws/journal/{sid}"
    print("Connecting to:", url)
    async with websockets.connect(url) as ws:
        # Upon connect, server starts a run with empty text and will stream messages.
        # Send a sample journaling message payload.
        msg = {"type": "message", "data": {"text": "Hello from notebook WS."}}
        await ws.send(json.dumps(msg))
        for _ in range(10):
            try:
                incoming = await ws.recv()
                print("<-", incoming)
            except Exception as e:
                print("WS error:", e)
                break

# To run in Jupyter: uncomment the following lines in a cell
import asyncio
await ws_journaling_session()


Connecting to: ws://localhost:8000/ws/journal/1322e0cf-930b-4e7c-89ac-716e1edb2301


CancelledError: 