# GPT-5 Model Access Test Notebook (Azure OpenAI / Foundry)

This notebook provides minimal test coverage for calling an Azure OpenAI (e.g., GPT-5 placeholder) deployment via three mechanisms:

1. Raw HTTPS (Responses API) using `aiohttp`
2. OpenAI Python SDK (configured for Azure endpoint)
3. Azure Foundry / Inference SDK (`azure-ai-inference`)

## Imports & Environment Variables
We will import:
- `os` for environment access
- `dotenv` to load a local `.env` if present
- `azure.identity` `DefaultAzureCredential` for Entra ID auth
- `aiohttp` for raw REST calls
- `openai` SDK for Responses API abstractions
- `azure.ai.inference` client for Foundry SDK access (Responses)

### Required Environment Variables (must be set before running code cells)
- `AZURE_OPENAI_ENDPOINT` (e.g. https://my-resource.openai.azure.com)
- `AZURE_OPENAI_API_VERSION` (e.g. 2024-06-01 or latest supported)
- `AZURE_OPENAI_MODEL` (deployment or model name)

Authentication: We'll use `DefaultAzureCredential()` which chains multiple auth methods. It must have access to the Azure OpenAI resource. A token for scope `https://cognitiveservices.azure.com/.default` will be requested.

Run the next cell to set up shared variables and credential.

In [None]:
# Setup imports, environment, and credential
import os
from typing import Any, Dict
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
import aiohttp, asyncio
import json
import openai
# Azure Inference / Foundry SDK (beta)
try:
    from azure.ai.inference import ChatCompletionsClient
    from azure.ai.inference.models import UserMessage, SystemMessage, AssistantMessage
    _FOUND_CLIENT_TYPE = 'chat'
except ImportError:
    # Fallback generic client naming if API changes
    from azure.ai.inference import Client as ChatCompletionsClient  # type: ignore
    from azure.ai.inference.models import UserMessage, SystemMessage, AssistantMessage  # type: ignore
    _FOUND_CLIENT_TYPE = 'generic'

load_dotenv(override=False)
endpoint = os.environ.get('AZURE_OPENAI_ENDPOINT')
api_version = os.environ.get('AZURE_OPENAI_API_VERSION')
model = os.environ.get('AZURE_OPENAI_MODEL')
if not all([endpoint, api_version, model]):
    raise RuntimeError('One or more required environment variables are missing: AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_VERSION, AZURE_OPENAI_MODEL')

credential = DefaultAzureCredential()
token = credential.get_token('https://cognitiveservices.azure.com/.default')
bearer = token.token
print('Environment and credential ready.')

## Raw Responses API Test (aiohttp)
This cell makes a direct POST call to the Azure OpenAI Responses endpoint.
Endpoint pattern: `{AZURE_OPENAI_ENDPOINT}/openai/responses?api-version={AZURE_OPENAI_API_VERSION}`
Headers: Bearer token from Entra ID and `Content-Type: application/json`.
Payload: a minimal prompt asking the model to reply briefly.

In [None]:
from rich import print as rprint

# Raw API call using aiohttp (SSE streaming version)
# This replaces the prior non-streaming call to demonstrate incremental token receipt.
# NOTE: This is a minimal illustrative parser for Azure Responses API SSE events.
# It accumulates output_text deltas and captures the final completion event.

import sys

async def stream_raw_responses(prompt: str) -> Dict[str, Any]:
    url = f'{endpoint}/openai/responses?api-version={api_version}'
    headers = {
        'Authorization': f'Bearer {bearer}',
        'Content-Type': 'application/json',
        'Accept': 'text/event-stream'
    }
    payload = {
        'model': model,
        'input': prompt,
        'stream': True  # instruct the service to stream via SSE
    }
    events: list[dict] = []
    accumulated_text: list[str] = []
    final_event: dict | None = None

    rprint('[bold blue]--- Streaming (SSE) start ---[/bold blue]')
    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, data=json.dumps(payload)) as resp:
            resp.raise_for_status()
            async for raw_line in resp.content:
                line = raw_line.decode('utf-8', errors='ignore').strip()
                if not line:
                    continue  # skip keep-alives / empty
                if not line.startswith('data:'):
                    continue  # ignore non-data lines
                data = line[5:].strip()
                if data == '[DONE]':
                    rprint('\n[bold blue]--- Streaming complete marker received ---[/bold blue]')
                    break
                try:
                    evt = json.loads(data)
                except Exception:
                    continue
                events.append(evt)
                etype = evt.get('type', '')

                # Capture deltas (example type: response.output_text.delta)
                if etype.endswith('output_text.delta'):
                    delta = evt.get('delta') or ''
                    if delta:
                        accumulated_text.append(delta)
                        # Print delta inline without newline for a streaming feel
                        print(delta, end='', flush=True)
                elif etype.endswith('output_text.done'):
                    print()  # newline after finishing text stream
                elif etype == 'response.completed':
                    final_event = evt
                # Could add reasoning / other event handling here if needed
    rprint('[bold blue]--- Streaming (SSE) end ---[/bold blue]')

    # Build a synthesized final result resembling the non-streaming shape for downstream logic
    synthesized: dict = {
        'stream_events_count': len(events),
        'output_text': ''.join(accumulated_text) if accumulated_text else None,
    }
    if final_event:
        # Merge some commonly useful fields
        synthesized.update({
            'id': final_event.get('response', {}).get('id') or final_event.get('id'),
            'model': final_event.get('response', {}).get('model') or final_event.get('model'),
            'usage': final_event.get('response', {}).get('usage') or final_event.get('usage'),
            'reasoning': final_event.get('response', {}).get('reasoning') or final_event.get('reasoning'),
        })
    return synthesized

raw_result = await stream_raw_responses('Tell me about AI and its impact on society.')

# --- Robust extraction helpers (unchanged except they now see synthesized output_text) ---

def extract_reasoning(resp: dict) -> dict:
    reasoning_info = {
        'effort': None,
        'summary': None,
        'reasoning_blocks': []
    }
    rtop = resp.get('reasoning')
    if isinstance(rtop, dict):
        reasoning_info['effort'] = rtop.get('effort')
        reasoning_info['summary'] = rtop.get('summary')
    # Streaming synthesis currently does not capture intermediate reasoning blocks; could be extended.
    return reasoning_info

reasoning_info = extract_reasoning(raw_result)

# Extract token usage (may be None if not provided in final event synthesis)
usage = raw_result.get('usage', {}) or {}
input_tokens = usage.get('input_tokens')
output_tokens = usage.get('output_tokens')
reasoning_tokens = (usage.get('output_tokens_details') or {}).get('reasoning_tokens') if usage else None

# Print with colors using rich
rprint(f"[bold blue]Token Usage:[/bold blue] [yellow]Input:[/yellow] {input_tokens} | [magenta]Reasoning:[/magenta] {reasoning_tokens} | [cyan]Output:[/cyan] {output_tokens}")
rprint(f"[bold purple]Reasoning Effort:[/bold purple] [white]{reasoning_info.get('effort')}[/white]")
rprint(f"[bold purple]Reasoning Summary:[/bold purple] [white]{reasoning_info.get('summary')}[/white]")
rprint(f"[bold purple]Stream Events Count:[/bold purple] {raw_result.get('stream_events_count')}")


## OpenAI SDK Test (Azure endpoint)
Using the `openai` Python SDK configured for Azure. We pass the Azure AD token as a bearer override by setting a custom `api_key` placeholder and injecting the token via default headers. Newer SDK versions allow `azure_endpoint` style configuration.

In [None]:
# OpenAI SDK streaming (SSE) Responses API call
from rich import print as rprint

client = openai.AzureOpenAI(
    api_version=api_version,
    azure_endpoint=endpoint,
    azure_ad_token=bearer,
)

# We'll stream a slightly longer prompt to observe deltas.
prompt = "Explain in one short sentence what AI is."

accumulated: list[str] = []
final_response = None
usage = None
reasoning_effort = None
reasoning_summary = None
stream_events = 0

rprint("[bold cyan]--- OpenAI SDK Streaming Start ---[/bold cyan]")
try:
    # Newer OpenAI SDK pattern: context manager for streaming
    with client.responses.stream(model=model, input=prompt) as stream:
        for event in stream:
            stream_events += 1
            etype = getattr(event, "type", None)
            # Output text deltas
            if etype and etype.endswith("output_text.delta"):
                delta = getattr(event, "delta", "") or ""
                if delta:
                    accumulated.append(delta)
                    print(delta, end="", flush=True)
            elif etype and etype.endswith("output_text.done"):
                print()  # newline after final text
            elif etype == "response.completed":
                # The final event exposes the full response in event.response
                final_response = getattr(event, "response", None)
        # Explicit close to release underlying connection
        stream.close()
except Exception as e:
    rprint(f"[red]Streaming error:[/red] {e}")

rprint("[bold cyan]--- OpenAI SDK Streaming End ---[/bold cyan]")

# Synthesize details from final response if available
if final_response:
    try:
        usage = getattr(final_response, "usage", None)
        reasoning = getattr(final_response, "reasoning", None)
        if reasoning:
            reasoning_effort = getattr(reasoning, "effort", None)
            reasoning_summary = getattr(reasoning, "summary", None)
    except Exception:
        pass

final_text = "".join(accumulated) if accumulated else None

# Print summary
if usage:
    input_tokens = getattr(usage, "input_tokens", None)
    output_tokens = getattr(usage, "output_tokens", None)
    reasoning_tokens = None
    # Try nested details if present
    try:
        details = getattr(usage, "output_tokens_details", None)
        if details:
            reasoning_tokens = getattr(details, "reasoning_tokens", None)
    except Exception:
        pass
else:
    input_tokens = output_tokens = reasoning_tokens = None

rprint(f"[bold green]Final Text:[/bold green] [white]{final_text}[/white]")
rprint(f"[bold blue]Token Usage:[/bold blue] [yellow]Input:[/yellow] {input_tokens} | [magenta]Reasoning:[/magenta] {reasoning_tokens} | [cyan]Output:[/cyan] {output_tokens}")
rprint(f"[bold purple]Reasoning Effort:[/bold purple] [white]{reasoning_effort}[/white]")
rprint(f"[bold purple]Reasoning Summary:[/bold purple] [white]{reasoning_summary}[/white]")
rprint(f"[bold purple]Stream Events Count:[/bold purple] {stream_events}")
