In [1]:
import os 
from dotenv import load_dotenv
load_dotenv()
import sys
import os
import base64
from PIL import Image
from io import BytesIO
sys.path.insert(0, os.path.abspath('..'))

In [2]:
import time, base64, json, requests, asyncio
import nest_asyncio
nest_asyncio.apply()
from typing import List, Dict, Union, Any 
import logging
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', force=True)     
# Import our providers
from llm_master import QueryLLM, LLMConfig
config = LLMConfig.from_env()
llm = QueryLLM(config)



INFO:llm_master.response_synthesizer:Initialized QueryLLM handler


In [3]:
 
async def run_code_interpreter():

    try:
        # With stream=False, you get a LLMResponse object directly
        response_generator = await llm.query(
            model_name="responses-o4-mini",
            messages=[],
            stream=True,
            tools=[{
                "type": "code_interpreter",
                "container": {
                    "type": "auto",
                    # "file_ids": ["file-TXT3RH5yycr7MAX2H8kLvq"]
                }
            }],
            input=[{
                "role": "user",
                "content": [{ "type": "input_text", "text": "Can you make a simple word document for lesson plan on the topic of 'The Importance of Water' for a 5 year old child? Please ensure you output a word document. You must include the file you generate in the annotation of the output text using a markdown url link in this format as an example: sandbox:/mnt/data/int100.txt" }]
            }],
            reasoning={"effort": "medium", "summary": "auto"},
            text={"format": {"type": "text"}},
            include=["code_interpreter_call.outputs"],
            max_output_tokens=32000
        )
        
         # Print the text response
        print("Text response:")
        async for chunk in response_generator:
            print(chunk, end="", flush=True)
            
    except Exception as e:
        print(f"Error: {str(e)}")
        import traceback
        traceback.print_exc()

# Run the async function with await
await run_code_interpreter()
 

INFO:llm_master.openai_provider:Successfully initialized OpenAI provider


Text response:
Instantiating provider: openai_provider with class OpenAIProvider


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/responses "HTTP/1.1 200 OK"



```py
import docx
doc = docx.Document()
doc.add_paragraph("Test")
doc.save("/mnt/data/test.docx")
"/mnt/data/test.docx exists"
```


```plaintext
'/mnt/data/test.docx exists'
```



<think>

**Generating docx file**

I'm excited to generate a docx file for the user! I need to run code to create the final document, but since the output won‚Äôt be visible to the user until they see the link, that's okay. I‚Äôll prepare the lesson plan, which will include the title, age group, objectives, materials, introduction, and main activity. Once the file is created, I‚Äôll provide the link using markdown for easy access. Let's get started on writing the code!**Creating lesson plan document**

I‚Äôm focusing on preparing a lesson plan about the importance of water. I want to include simple facts like how water keeps us alive and helps plants grow. I'll add some discussion questions, a recap for the conclusion, and an extension activity involving drawing water habits and discussing conservation. On

In [None]:
{"type":"response.output_item.done","sequence_number":877,"output_index":2,"item":{"id":"msg_6838c205884c8191a9ef3e5e8c5378170a1145d46963df26","type":"message","status":"completed","content":[{"type":"output_text","annotations":[{"type":"container_file_citation","container_id":"cntr_6838c1f76ff88191b27d1dcd064592550c88636f8949b334","end_index":219,"file_id":"cfile_6838c20627bc81919fbf3bae04fa20e2","filename":"lesson_plan_importance_of_water.docx","start_index":165}],"text":"I‚Äôve created the lesson plan document for \"The Importance of Water\" suitable for a 5-year-old. You can download it here:\n\n[Download the lesson plan (Word document)](sandbox:/mnt/data/lesson_plan_importance_of_water.docx)"}],"role":"assistant"}}

In [3]:
import uuid

async def run_stream():
    messages = [
        {
            "role": "user",
            "parts": [
                "You are a visual narrative analyst helping me study a storyboard sequence.",
                {
                    "type": "text",
                    "text": "Step 1 ‚Äì describe the setting and dominant mood in the first frame before you see anything else."
                },
                {
                    "type": "image",
                    "url": "https://uflo-chat-attachments.s3.us-west-1.amazonaws.com/218/215c88fa-952f-4a21-80d7-2af01ef1c191/ca106aaf-bfa3-45b9-bf70-eb612bbe27d0.png",
                    "detail": "high"
                },
                {
                    "type": "text",
                    "text": "Step 2 ‚Äì compare that first frame with the second image, focusing specifically on the change in camera angle and the subject‚Äôs posture."
                },
                {
                    "type": "image",
                    "url": "https://uflo-chat-attachments.s3.us-west-1.amazonaws.com/218/215c88fa-952f-4a21-80d7-2af01ef1c191/00b5fe2d-ff07-43e1-8f2e-ab73906a30b7.png"
                },
                {
                    "type": "text",
                    "text": "Step 3 ‚Äì after reflecting on that comparison, use it to predict the emotional beat captured in the final image."
                },
                {
                    "type": "image",
                    "url": "https://uflo-chat-attachments.s3.us-west-1.amazonaws.com/218/215c88fa-952f-4a21-80d7-2af01ef1c191/2cc690db-8cc4-4737-8e97-ac96b4256793.png"
                },
                {
                    "type": "text",
                    "text": "Deliver the answer with three sections titled Step 1, Step 2, and Step 3 so I can verify you tracked the interleaved instructions correctly."
                }
            ]
        }
    ]


    try:
        response_generator = await llm.query(
            model_name="responses-gpt-4.1",
            # reasoning={"thinking_budget": 0},
            messages=messages,
            stream=True,
            moderation=False,
        )

        async for chunk in response_generator:
            print(chunk, end="", flush=True)

    except Exception as e:
        print(f"Error: {str(e)}")

await run_stream()  # Don't use asyncio.run() here

INFO:llm_master.response_synthesizer:Instantiating provider: openai_provider with class OpenAIProvider
INFO:llm_master.openai_provider:Successfully initialized OpenAI provider
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/responses "HTTP/1.1 200 OK"


**Step 1**  
The setting in the first frame (the bar and line chart) is analytical and data-driven, taking place in a context where information is being compared and evaluated. The dominant mood is investigative and comparative, highlighted by the dual axes (Population and GDP Growth), which imply a search for relationships or patterns among Latin American countries. The viewer is invited to consider both economic and demographic dimensions, creating a sense of curiosity and focus.

---

**Step 2**  
Compared to the first frame, the second image (pie chart) shifts both camera angle and the ‚Äúsubject‚Äôs posture.‚Äù Here, the perspective changes from a linear, comparative format to a more centralized, holistic one. The camera metaphorically ‚Äúzooms out,‚Äù offering a top-down view that unifies the subject‚Äîpopulation‚Äîinto a single, cohesive whole rather than segmenting it with separate variables. The posture of the data is less about direct comparison and more about proportion and 

In [4]:
import uuid
import time

async def run_stream():
    messages = [
        {
            "role": "user",
            "content": "Write a two sentence story about a cat and a dog"
        }
    ]


    try:
        start_time = time.time()
        first_chunk_time = None
        
        response_generator = await llm.query(
            model_name="googleai:gemini-2.5-flash",
            reasoning={"thinking_budget": 0},
            messages=messages,
            stream=True,
            moderation=False,
        )

        i = 0
        async for chunk in response_generator:
            if i == 0:
                first_chunk_time = time.time()
                latency_ms = (first_chunk_time - start_time) * 1000
                print(f"Latency to first token: {latency_ms:.2f} ms\n")
            print(chunk, end="", flush=True)
            i += 1

    except Exception as e:
        print(f"Error: {str(e)}")

await run_stream()  # Don't use asyncio.run() here

INFO:llm_master.response_synthesizer:Instantiating provider: google_genai with class GoogleGenAIProvider
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?alt=sse "HTTP/1.1 200 OK"


Latency to first token: 1381.56 ms

Barnaby, a scruffy terrier mix, lunged at the tabby perched precariously on the fence, only to have the feline's tail flick in his face as she leapt gracefully onto the shed roof. Later that afternoon, they napped in a sunbeam, the dog's chin resting on the cat's fluffy tail, a truce called until the next squirrel or tempting bird appeared. 


In [5]:
from google import genai
from google.genai import types
import time

client = genai.Client()

start_time = time.time()
first_chunk_time = None

response = client.models.generate_content_stream(
    model="gemini-2.5-flash",
    config=types.GenerateContentConfig(
        thinking_config=types.ThinkingConfig(thinking_budget=0) # Disables thinking
    ),
    contents=["Write a two sentence story about a cat and a dog"]
)
for i, chunk in enumerate(response):
    if i == 0:
        first_chunk_time = time.time()
        latency_ms = (first_chunk_time - start_time) * 1000
        print(f"Latency to first token: {latency_ms:.2f} ms\n")
    print(chunk.text, end="")

INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?alt=sse "HTTP/1.1 200 OK"


Latency to first token: 431.00 ms

Whiskers the cat watched Fluffy the dog chase a squirrel, her tail swishing with a familiar mix of annoyance and quiet contentment at their shared, if differently experienced, afternoon. Later, they both napped in a sunbeam, Fluffy's paw resting near Whiskers' twitching ear, a silent truce to their daily theatrics. 

In [11]:


async def run_query():
    messages = [
        {
            "role": "user", 
            "content": "Explain quantum computing in simple terms",
        },
    ]

    try:
        # With stream=False, you get a LLMResponse object directly
        response = await llm.query(
            # model_name="accounts/fireworks/models/deepseek-r1",
            model_name="googleai:gemini-2.5-flash",
            # reasoning_effort="low",
            messages=messages,
            stream=False,  # Set to False for non-streaming
            # temperature=0.5,
            fallback_provider="openai",
            fallback_model="gpt-4o",
            moderation=False
        )
        
        # Print the full response content
        print(response.content)
        
        # You can also access other metadata
        print("\n--- Response Metadata ---")
        print(f"Model: {response.model_name}")
        print(f"Input tokens: {response.usage.input_tokens}")
        print(f"Output tokens: {response.usage.output_tokens}")
        print(f"Cost: ${response.cost:.6f}")
        print(f"Latency: {response.latency:.2f} seconds")
            
    except Exception as e:
        print(f"Error: {str(e)}")
        import traceback
        traceback.print_exc()

# Run the async function with await
await run_query()

INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"


Imagine a regular computer is like a light switch. It can be either **on (1)** or **off (0)**. These are called **bits**.

Quantum computing is fundamentally different. Instead of bits, it uses **qubits**. Here's how to think about them in simple terms:

1.  **Qubits: Not Just On or Off**
    *   **Classical Bit:** A light switch (either 0 or 1).
    *   **Quantum Qubit:** Imagine a spinning coin. While it's spinning, it's not yet heads *or* tails. It's a bit of both! A qubit can be **0, 1, or both 0 and 1 at the same time** (this is called **superposition**).
    *   **Why it matters:** This means a single qubit can hold much more information than a classical bit. If you have many qubits, they can explore many, many possibilities simultaneously.

2.  **Superposition: The "Spinning Coin" State**
    *   Because a qubit can be both 0 and 1 at the same time, a quantum computer can perform calculations on many possibilities concurrently. Instead of trying one path at a time, it can explor

In [None]:

async def test_perplexity():
    # Sample messages
    messages = [
        {
            "role": "system",
            "content": "You are an artificial intelligence assistant and you need to engage in a helpful, detailed, polite conversation with a user. Answer as concisely as possible."
        },
        {   
            "role": "user",
            "content": "How many stars are in the universe?"
        },
    ]
    
    # Test non-streaming query
    print("Testing non-streaming Perplexity API with citations")
    response = await llm.query(
        model_name="sonar",
        messages=messages,
        stream=False,
        extra_body={
            "return_images": True,
            "web_search_options": {
                "search_context_size": "low"
            }
        },
    )
    
    print(f"Response content: {response.content}")
    # print(f"Citations: {response.citations}")
    
    # Test streaming query
    print("\nTesting streaming Perplexity API with citations")
    stream_generator = await llm.query(
        model_name="sonar",
        messages=messages,
        stream=True
    )
    
    full_response = ""
    citations_found = False
    
    async for chunk in stream_generator:
        if chunk.startswith("\n<citations>"):
            print(f"Found citations in stream: {chunk}")
            citations_found = True
        else:
            full_response += chunk
            print(f"Received chunk: {chunk}")
    
    print(f"\nFull response: {full_response[:100]}...")
    
    # After streaming is complete, check if provider has citations
    provider = llm._get_provider("sonar")
    if hasattr(provider, 'last_citations') and provider.last_citations:
        print(f"Citations from provider.last_citations: {provider.last_citations}")
        citations_found = True
    
    if not citations_found:
        print("No citations found in streaming response")

await test_perplexity()


In [None]:
# Define the prompt
prompt = "A cat on its back legs running like a human is holding a big silver fish with its arms. The cat is running away from the shop owner and has a panicked look on his face. The scene is situated in a crowded market."

try:
    # Generate the image
    response = await llm.query(
        model_name="recraftv3",
        messages=[{"role": "user", "content": prompt}],
        style="digital_illustration"
    )
    
    # Print the result (which is the image URL)
    print(f"Image generation successful! URL: {response.content}")
    
except Exception as e:
    print(f"Error generating image: {str(e)}")

In [None]:

async def run_query():
    messages = [
        {
            "role": "user", 
            "content": "Explain quantum computing in simple terms",
        },
    ]

    try:
        # With stream=False, you get a LLMResponse object directly
        response = await llm.query(
            model_name="gpt-4o-mini-audio-preview",
            messages=messages,
            stream=False,
            modality=["text", "audio"],
            audio={"voice": "ash", "format": "wav"}
        )
        
         # Print the text response
        print("Text response:")
        print(response.content)
        
        # Save the audio to a file if available
        if response.audio_data:
            wav_bytes = base64.b64decode(response.audio_data)
            output_file = "dog_response.wav"
            with open(output_file, "wb") as f:
                f.write(wav_bytes)
            print(f"\nAudio saved to '{output_file}'")
        else:
            print("\nNo audio data received in the response")
            
    except Exception as e:
        print(f"Error: {str(e)}")
        import traceback
        traceback.print_exc()

# Run the async function with await
await run_query()

INFO:llm_master.response_synthesizer:Instantiating provider: openai with class UnifiedProvider
INFO:llm_master.base_provider:Initialized openai provider with base URL: None
INFO:llm_master.base_provider:Successfully initialized openai provider
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Text response:
None

Audio saved to 'dog_response.wav'


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/responses "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/responses "HTTP/1.1 200 OK"


In [None]:

async def run_query():
    messages = [
        {
            "role": "user", 
            "content": f"Narrate the text: "
        },
    ]

    try:
        # With stream=False, you get a LLMResponse object directly
        response = await llm.query(
            model_name="gpt-4o-mini-audio-preview",
            messages=messages,
            stream=False,
            modality=["text", "audio"],
            audio={"voice": "ash", "format": "wav"}
        )
        
         # Print the text response
        print("Text response:")
        print(response.content)
        
        # Save the audio to a file if available
        if response.audio_data:
            wav_bytes = base64.b64decode(response.audio_data)
            output_file = "dog_response.wav"
            with open(output_file, "wb") as f:
                f.write(wav_bytes)
            print(f"\nAudio saved to '{output_file}'")
        else:
            print("\nNo audio data received in the response")
            
    except Exception as e:
        print(f"Error: {str(e)}")
        import traceback
        traceback.print_exc()

# Run the async function with await
await run_query()

In [7]:
import yaml
MODEL_NAME = "vertexai:gemini-2.5-pro"
async def test_gemini_caching_with_long_prompt():
    """Test Gemini caching with a very long prompt from desmos documentation"""
    with open("../chat/prompts/roleplay_prompts.yaml", "r") as f:
        roleplay_prompts = yaml.safe_load(f.read())

    socratic_prompt = next((item for item in roleplay_prompts if item.get("type") == "socratic"), None)

    # Create a long prompt with the documentation
    long_prompt = f"""Here is the complete Desmos API documentation:

{socratic_prompt.get("prompt")}

Based on this documentation, please answer this simple question: What is this text about? Give me a brief 2-3 sentence summary."""
    
    messages = [{"role": "user", "content": long_prompt}]
    
    print("Testing Gemini caching with long prompt...")
    print(f"Prompt length: {len(long_prompt)} characters")
    print("=" * 60)
    
    # Test 1: First call (should not have cache hits)
    print("\nüî• FIRST CALL (no cache):")
    response1 = await llm.query(
        model_name=MODEL_NAME,
        messages=messages,
        stream=False
    )
    print(f"Response: {response1.content}")
    print(f"Usage: {response1.usage}")
    print(f"Cost: ${response1.cost:.6f}")
    time.sleep(10)
    # Test 2: Second call with previous response + new question (should have cache hits if caching works)
    print("\n‚ôªÔ∏è  SECOND CALL (with previous response + new question):")
    
    # Create new messages including the previous response and a new question
    messages_with_response = [
        {"role": "user", "content": long_prompt},
        {"role": "assistant", "content": response1.content},
        {"role": "user", "content": "Based on your summary, can you now tell me what are the 3 most important features of a good socratic conversation?"}
    ]
    
    response2 = await llm.query(
        model_name=MODEL_NAME, 
        messages=messages_with_response,
        stream=False
    )
    print(f"Response: {response2.content}")
    print(f"Usage: {response2.usage}")
    print(f"Cost: ${response2.cost:.6f}")
    
    # Test 3: Streaming version with same conversation context
    print("\nüåä STREAMING CALL (with conversation context):")
    messages_with_response2 = [
        {"role": "user", "content": long_prompt},
        {"role": "assistant", "content": response1.content},
        {"role": "user", "content": "Based on your summary, can you now tell me what are the 3 most important features of a good socratic conversation?"},
        {"role": "assistant", "content": response2.content},
        {"role": "user", "content": "What is the most difficult part of a socratic conversation in your opinion?"}
    ]
    stream = await llm.query(
        model_name=MODEL_NAME,
        messages=messages_with_response2, 
        stream=True
    )
    
    full_content = ""
    async for chunk in stream:
        full_content += chunk
    
    provider = llm._get_provider(MODEL_NAME)
    if hasattr(provider, 'last_usage') and provider.last_usage:
        print(f"Streamed response: {full_content}")
        print(f"Usage: {provider.last_usage}")
        try:
            from llm_master.classes import ModelRegistry
            model_config = ModelRegistry.get_config(MODEL_NAME)
            cost = provider.last_usage.calculate_cost(model_config)
            print(f"Cost: ${cost:.6f}")
        except Exception as e:
            print(f"Cost calculation failed: {e}")
    
    print("\n" + "=" * 60)
    print("üîç ANALYSIS:")
    print(f"Call 1 tokens: {response1.usage.input_tokens} input, {response1.usage.output_tokens} output, {response1.usage.cached_tokens} cached")
    print(f"Call 2 tokens: {response2.usage.input_tokens} input, {response2.usage.output_tokens} output, {response2.usage.cached_tokens} cached")
    if hasattr(provider, 'last_usage') and provider.last_usage:
        print(f"Stream tokens: {provider.last_usage.input_tokens} input, {provider.last_usage.output_tokens} output, {provider.last_usage.cached_tokens} cached")
    
    if response2.usage.cached_tokens > 0:
        print("üéâ CACHING IS WORKING!")
    else:
        print("‚ùå No cache hits detected")

# Run the test
await test_gemini_caching_with_long_prompt()


FileNotFoundError: [Errno 2] No such file or directory: '../chat/prompts/roleplay_prompts.yaml'

In [None]:
from openai import OpenAI
client = OpenAI()

response = client.responses.create(
    model="o3",
    tools=[{
        "type": "web_search_preview",
        "search_context_size": "low",
    }],
    input="What movie won best picture in 2025?",
)

print(response.output_text)