In [4]:
import asyncio
import httpx
import os
import json
import re

# ---------------------------
# Helper Functions
# ---------------------------
def extract_thinking_parts(text):
    """
    Extract inline <think>...</think> segments from text.
    Returns a tuple (non_thinking_text, thinking_text).
    """
    thinking_parts = re.findall(r'<think>(.*?)</think>', text, flags=re.DOTALL)
    non_thinking_parts = re.split(r'<think>.*?</think>', text, flags=re.DOTALL)
    non_thinking_text = ''.join(non_thinking_parts).strip()
    thinking_text = "\n".join(thinking_parts).strip()
    return non_thinking_text, thinking_text

def parse_response(response_text):
    """
    Extracts Explanation, Answer (or Exact Answer), and Confidence fields from the response text.
    """
    explanation = ""
    answer = ""
    confidence = ""
    
    explanation_match = re.search(r'Explanation:\s*(.*?)(?:\n|$)', response_text)
    answer_match = re.search(r'(?:Exact Answer|Answer):\s*(.*?)(?:\n|$)', response_text)
    confidence_match = re.search(r'Confidence:\s*(\d+)%', response_text)
    
    if explanation_match:
        explanation = explanation_match.group(1).strip()
    if answer_match:
        answer = answer_match.group(1).strip()
    if confidence_match:
        confidence = int(confidence_match.group(1))
    
    return {"explanation": explanation, "answer": answer, "confidence": confidence}

# ---------------------------
# Prompt Template and Message Formatter
# ---------------------------
# Use double curly braces for literal text.
PROMPT_EXACT_ANSWER = (
    "You will be given a question and a response format. Please output the answer to the question following the format.\n\n"
    "Response format:\n"
    "Explanation: {{your explanation for your final answer}}\n"
    "Exact Answer: {{your succinct, final answer}}\n"
    "Confidence: {{your confidence score between 0% and 100% for your answer}}\n\n"
    "Question:\n{question}"
)

def format_message(question):
    """
    Given a question dictionary, format the prompt using the EXACT_ANSWER template.
    Returns a list of message dictionaries (to mimic a conversation).
    """
    question_text = question["question"]
    prompt = PROMPT_EXACT_ANSWER.format(question=question_text)
    return [{"role": "user", "content": prompt}]

# ---------------------------
# Create a Single Test Query
# ---------------------------
query = {
    "id": "test_q1",
    "question": "Test question number 1: What is the capital of country 1?",
    "answer_type": "exact_match",
    "image": ""  # not used in this example
}

# Ensure the results directory exists
os.makedirs("results", exist_ok=True)
output_file = f"results/{query['id']}.json"

# ---------------------------
# Streaming Fetch Function with Continuous File Update and Full Response Accumulation
# ---------------------------
async def stream_single_query_and_update_file(query, client, file_path):
    """
    Performs a streaming POST request to the vLLM /chat/completions endpoint for a single query.
    As chunks arrive, it accumulates the generated text, reasoning, and the full raw response.
    After processing each chunk, the updated result is written to a JSON file.
    """
    messages = format_message(query)
    payload = {
        "model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",  # Adjust as needed.
        "messages": messages,
        "temperature": 0.0,
        "stream": True
    }
    
    final_content = ""
    final_reasoning = ""
    full_response_list = []  # Will accumulate all raw response chunks.
    result = {}
    
    async with client.stream("POST", "/chat/completions", json=payload) as response:
        async for line in response.aiter_lines():
            if not line:
                continue
            # Remove "data:" prefix if present.
            if line.startswith("data:"):
                line = line[len("data:"):].strip()
            # Save the raw chunk line to the list.
            full_response_list.append(line)
            try:
                chunk = json.loads(line)
            except Exception as e:
                print(f"Could not parse chunk: {line}\nError: {e}")
                continue
            
            # Process the chunk if it contains a delta update.
            if "choices" in chunk:
                delta = chunk["choices"][0].get("delta", {})
                if "content" in delta:
                    final_content += delta["content"]
                if "reasoning_content" in delta:
                    final_reasoning += delta["reasoning_content"]
                
                # Update result based on the current accumulated content.
                non_thinking, inline_thinking = extract_thinking_parts(final_content)
                parsed = parse_response(non_thinking)
                result = {
                    "id": query["id"],
                    "question": query["question"],
                    "raw_response": final_content,
                    "final_reasoning": final_reasoning,
                    "inline_thinking": inline_thinking,
                    "parsed": parsed,
                    "full_response": "\n".join(full_response_list)  # full raw response as a single string.
                }
                
                # Write (overwrite) the JSON file with the current result.
                with open(file_path, "w") as f:
                    json.dump(result, f, indent=2)
                
                print(f"Updated {file_path} with current chunk.")
    
    return result

# ---------------------------
# Main Async Function for the Single Query Streaming
# ---------------------------
async def main():
    base_url = "http://localhost:8000/v1"  # vLLM API endpoint
    async with httpx.AsyncClient(base_url=base_url, timeout=300) as client:
        final_result = await stream_single_query_and_update_file(query, client, output_file)
        print("Final Result:")
        print(json.dumps(final_result, indent=2))

# ---------------------------
# Run the Async Main Function
# ---------------------------
await main()


Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test_q1.json with current chunk.
Updated results/test