In [2]:
import json
import dataclasses
import logging
from typing import AsyncGenerator

# 1. Custom JSON Encoder for dataclasses
class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if dataclasses.is_dataclass(o) and not isinstance(o, type):
            return dataclasses.asdict(o)
        return super().default(o)

# 2. Example dataclass
@dataclasses.dataclass
class UserEvent:
    user_id: int
    action: str

# 3. Error formatting
def error_dict(error):
    return {"error": str(error)}

# 4. Simulated async generator that produces events
async def generate_events() -> AsyncGenerator[dict, None]:
    yield UserEvent(user_id=1, action="login")
    yield {"user_id": 2, "action": "logout"}
    raise ValueError("Something went wrong!")  # Simulate an error

# 5. NDJSON formatter (your main block)
async def format_as_ndjson(r: AsyncGenerator[dict, None]) -> AsyncGenerator[str, None]:
    try:
        async for event in r:
            yield json.dumps(event, ensure_ascii=False, cls=JSONEncoder) + "\n"
    except Exception as error:
        logging.exception("Exception while generating response stream: %s", error)
        yield json.dumps(error_dict(error)) + "\n"

# 6. Running and displaying the output in Jupyter
async def print_ndjson_stream():
    async for line in format_as_ndjson(generate_events()):
        print(line, end="")  # NDJSON already ends with \n

# In Jupyter: Run this cell with await
await print_ndjson_stream()


ERROR:root:Exception while generating response stream: Something went wrong!
Traceback (most recent call last):
  File "/tmp/ipykernel_84796/2622394548.py", line 32, in format_as_ndjson
    async for event in r:
  File "/tmp/ipykernel_84796/2622394548.py", line 27, in generate_events
    raise ValueError("Something went wrong!")  # Simulate an error
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Something went wrong!


{"user_id": 1, "action": "login"}
{"user_id": 2, "action": "logout"}
{"error": "Something went wrong!"}


In [None]:
# Azure Easy Auth Test Notebook

import requests
from urllib.parse import urljoin

# Set your deployed app's base URL here (e.g., "https://your-app.azurewebsites.net/")
BASE_URL = "https://YOUR_APP_URL.azurewebsites.net/"

# 1. Helper to show login URL (manual - user must use browser for first login)
login_url = urljoin(BASE_URL, "/.auth/login/aad")  # or change 'aad' to your provider
print(f"Open this URL in your browser, log in, and copy the cookies back here:\n{login_url}")

# 2. Paste your cookies after logging in (from your browser's dev tools, for the app domain)
cookies = {
    # Example: 'AppServiceAuthSession': 'eyJ...'
    # Fill in your session cookie(s) here!
}

# 3. Call /.auth/me with your session cookie
me_url = urljoin(BASE_URL, "/.auth/me")
response = requests.get(me_url, cookies=cookies)

print("Status code:", response.status_code)
print("Response JSON:")
print(response.json())