In [5]:
# %pip install google-genai openai langfuse openinference-instrumentation-google-genai

In [None]:
from langfuse import Langfuse
from langfuse import get_client
from langfuse import observe, propagate_attributes, Langfuse
from dotenv import load_dotenv
from google import genai
import json
from google.genai import types
import os

load_dotenv(".env")

langfuse = Langfuse(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    host=os.getenv("LANGFUSE_HOST")
)

@observe(name="call-gemini-common-fn", as_type="generation", capture_input=True, capture_output=True)
def call_gemini(input, ground_truth, model_id="gemini-2.0-flash", file_paths=None, generation_config=None):
    """
    Process multiple files with Gemini and trace with Langfuse.
    
    Args:
        input: Text prompt/instruction
        model_id: Gemini model to use
        file_paths: List of file paths or single file path (string)
    """
    with propagate_attributes(
        user_id="eshanj",
        session_id="session_x",
        tags=["gemini", "eshan's-trace", "multi-file"],
        metadata={"email": "eshan@fonixedu.com"},
        version="1.0.0",
    ):
        client = genai.Client(api_key=os.getenv("GOOGLE_API_KEY"))
        
        # Handle both single file and multiple files
        if file_paths is None:
            file_paths = []
        elif isinstance(file_paths, str):
            file_paths = [file_paths]
        
        uploaded_files = []
        
        try:
            # Upload all files
            for file_path in file_paths:
                print(f"  Uploading file: {file_path}...")
                uploaded_file = client.files.upload(file=file_path)
                uploaded_files.append(uploaded_file)
                print(f"  ✓ File uploaded: {uploaded_file.name} (URI: {uploaded_file.uri})")
            
            # Build content array: [prompt, file1, file2, file3, ...]
            contents = [input] + uploaded_files
            
            print(f"  Processing {len(uploaded_files)} file(s) with prompt...")

            if generation_config:
                response = client.models.generate_content(
                    model=model_id,
                    contents=contents,
                )
            else:
                response = client.models.generate_content(
                    model=model_id,
                    contents=contents,
                    config=generation_config,
                )
            
            # print("\n--- Response ---")
            # print(response.text)
            # print("----------------")
            
            usage_meta = response.usage_metadata
    
            prompt_tokens = usage_meta.prompt_token_count or 0
            candidate_tokens = usage_meta.candidates_token_count or 0
            thought_tokens = usage_meta.thoughts_token_count or 0
            cached_tokens = usage_meta.cached_content_token_count or 0
            total_tokens = usage_meta.total_token_count or 0

            # loop through details to find IMAGE modality
            image_tokens = 0
            if usage_meta.prompt_tokens_details:
                for detail in usage_meta.prompt_tokens_details:
                    if detail.modality == "IMAGE":
                        image_tokens += detail.token_count

            effective_output_tokens = candidate_tokens + thought_tokens

            langfuse.update_current_trace(
                input={
                    "prompt": input,
                    "files": [f.name for f in uploaded_files],
                    "file_count": len(uploaded_files)
                },
                output=response.text,
                metadata={
                    "ground_truth": ground_truth,
                }
            )

            INPUT_PRICE_PER_TOKEN = 0.3 / 1000000
            OUTPUT_PRICE_PER_TOKEN = 2.5 / 1000000
            CACHING_PRICE_PER_TOKEN = 0.03 / 1000000

            input_cost = prompt_tokens * INPUT_PRICE_PER_TOKEN
            output_cost = effective_output_tokens * OUTPUT_PRICE_PER_TOKEN
            cache_read_input_cost = cached_tokens * CACHING_PRICE_PER_TOKEN
            total_cost = input_cost + output_cost + cache_read_input_cost
            
            langfuse.update_current_generation(
                cost_details={
                    "input": input_cost,
                    "cache_read_input_tokens": cache_read_input_cost,
                    "output": output_cost,
                    "total": total_cost,
                },
                usage_details={
                    "input": prompt_tokens,
                    "output": effective_output_tokens,
                    "cache_read_input_tokens": cached_tokens 
                },
            )

            return response.text, ground_truth
            
        except FileNotFoundError as e:
            print(f"  ERROR: File not found: {e}")
            raise
        except Exception as e:
            print(f"  An error occurred: {e}")
            raise
        finally:
            for uploaded_file in uploaded_files:
                try:
                    print(f"  Deleting uploaded file: {uploaded_file.name}...")
                    client.files.delete(name=uploaded_file.name)
                    print(f"  ✓ File deleted: {uploaded_file.name}")
                except Exception as e:
                    print(f"  Failed to delete {uploaded_file.name}: {e}")

@observe(as_type="evaluator")
def evaluate_with_gemini(prediction, ground_truth):
    eval_generation_config = types.GenerateContentConfig(
        temperature=0.0,
        top_p=0.9, # Nucleus sampling threshold (0.0 to 1.0) OVERRIDES if temprature is 0
        top_k=40, # Number of top tokens to sample from (e.g., 40) OVERRIDES if temprature is 0
        max_output_tokens=256, # Maximum tokens to generate
        # frequency_penalty=0.1, # Penalizes tokens based on how often they have appeared (0.0 to 1.0)
        # presence_penalty=0.1, # Penalizes tokens based on whether they have appeared at least once (0.0 to 1.0)
        system_instruction="You are an evaluator. Compare the ground truth and the prediction.",
        # tools=tools_list,  # List of functions the model can call
        response_mime_type="application/json", # Forces output format (e.g., "application/json" for structured data)
        thinking_config=types.ThinkingConfig(
            # Set to a number of tokens to budget for internal thought process (0 disables)
            thinking_budget=1024, 
            # Include the model's internal thoughts in the response (useful for debugging)
            include_thoughts=True, 
        ),
        # Note: candidate_count is currently fixed at 1 for most models/use cases
    )

    eval_prompt = f"""
    Return a JSON object with exactly two fields:

    - "score": a float between 0 and 1 inclusive
    - "reason": a short explanation of why the score was given

    STRICT RULES:
    - Output ONLY valid JSON.
    - Do NOT include backticks, markdown, or any text outside the JSON.
    - "score" MUST be a float.
    - "reason" MUST be a string.

    ground_truth:
    {ground_truth}

    prediction:
    {prediction}
    """

    raw_output = call_gemini(
        eval_prompt,
        ground_truth=None,
        model_id="gemini-3-pro-preview",
        file_paths=None,
        generation_config=eval_generation_config
    )

    # If call_gemini returns a tuple → extract the text
    if isinstance(raw_output, tuple):
        raw_output = raw_output[0]

    print("Gemini Raw Output:", raw_output)

    clean_json = raw_output.replace("```json", "").replace("```", "").strip()

    try:
        result = json.loads(clean_json)
    except Exception as e:
        raise ValueError(f"Gemini did not return valid JSON: {clean_json}") from e

    score = float(result["score"])
    reason = result["reason"]

    print(" Score:", score)
    print(" Reason:", reason)

    langfuse.score_current_trace(
        name="score",
        value=score,
        comment=reason,
    )

    return score, reason

@observe(name="gemini-qa-pipeline", as_type="chain")
def main():
  INSTRUCTION_PROMPT_COMPLEX = """
  Task: Identify all meaningful blocks of content and extract the structural relationships between them.

  JSON Schema: Output the prediction using the 'document_elements' array, where each object contains:
  - id (string): A unique identifier (e.g., B1, N_Start).
  - text (string): The transcribed content.
  - type (enum): The element's function. Use only: TITLE, PARAGRAPH, LIST, TABLE_CELL, DIAGRAM_NODE, DIAGRAM_ARROW, KEY_VALUE_PAIR.
  - bbox (array of 4 integers): Normalized coordinates [xmin, ymin, xmax, ymax]. All values MUST be integers between 0 and 100.
  - relations (array of objects): A list of semantic connections.

  Relations Schema (Inside relations):
  - target_id (string): The id of the element it connects to.
  - relation_type (enum): The connection type. Use: FLOWS_TO, IS_LABEL_FOR, VALUE_FOR.

  Specific Instructions:
  1. For diagrams, use DIAGRAM_NODE for shapes and DIAGRAM_ARROW for lines. Use FLOWS_TO to link the source node to the target node.
  2. For forms/tables, use KEY_VALUE_PAIR. If a value is separated from its label, link them using VALUE_FOR.
  """

  INSTRUCTION_PROMPT_SIMPLE = """
  Task: Identify all meaningful blocks of content and extract the structural relationships between them.

  JSON Schema: Output using the 'document_elements' array, where each object contains:
  - id (string): Unique identifier (e.g., B1, N1)
  - text (string): The transcribed content
  - type (enum): TITLE, PARAGRAPH, LIST, TABLE_CELL, DIAGRAM_NODE, DIAGRAM_ARROW, KEY_VALUE_PAIR
  - relations (array): Semantic connections with:
  - target_id (string): Connected element's id
  - relation_type (enum): FLOWS_TO, IS_LABEL_FOR, VALUE_FOR
  
  Specific Instructions:
  1. For diagrams: Use DIAGRAM_NODE for shapes, DIAGRAM_ARROW for lines. Link with FLOWS_TO.
  2. For forms/tables: Use KEY_VALUE_PAIR. Link labels to values using VALUE_FOR.
  """

  INSTRUCTION_PROMPT_MARKDOWN = """
  # Document Analysis

  ## Task
  Analyze the document and provide:
  1. A markdown description of the page content (diagrams, tables, images, etc.)
  2. Complete transcription of all text

  ## Output Format

  ### 1. Page Description
  Provide a brief markdown summary describing:
  - Document type and layout
  - Visual elements present (diagrams, tables, charts, images)
  - Overall structure

  ### 2. Transcription
  Output using the `document_elements` array:

  | Field | Type | Description |
  |-------|------|-------------|
  | `id` | string | Unique identifier (e.g., B1, N1) |
  | `text` | string | The transcribed content |
  | `type` | enum | TITLE, PARAGRAPH, LIST, TABLE_CELL, DIAGRAM_NODE, KEY_VALUE_PAIR |
  | `relations` | array | Connections: `target_id` and `relation_type` (FLOWS_TO, VALUE_FOR) |

  ## Instructions
  - **Text:** Transcribe all visible text
  - **Diagrams:** Describe flow and connections using FLOWS_TO
  - **Tables/Forms:** Link labels to values using VALUE_FOR
  """

  INSTRUCTION_SIMPLE_PROMPT_MARKDOWN = """
  ## Task
  Analyze the document image and provide:
  1. **Transcribe** all text exactly as written
  2. **Describe** any diagrams, tables, or visual elements

  ## Output
  Respond in **Markdown format**:
  - Use headings, lists, and tables to match the document structure
  - For diagrams, describe the flow (e.g., A → B → C)
  - Mark unclear text as `[unclear]`
  """
  
  GROUND_TRUTH = ''

  generation_config = types.GenerateContentConfig(
        temperature=0.0,
        top_p=0.9, # Nucleus sampling threshold (0.0 to 1.0) OVERRIDES if temprature is 0
        top_k=40, # Number of top tokens to sample from (e.g., 40) OVERRIDES if temprature is 0
        max_output_tokens=8192, # Maximum tokens to generate
        # frequency_penalty=0.1, # Penalizes tokens based on how often they have appeared (0.0 to 1.0)
        # presence_penalty=0.1, # Penalizes tokens based on whether they have appeared at least once (0.0 to 1.0)
        system_instruction="You are an expert Document Analyzer.",
        # tools=tools_list,  # List of functions the model can call
        # response_mime_type="application/json", # Forces output format (e.g., "application/json" for structured data)
        thinking_config=types.ThinkingConfig(
            # Set to a number of tokens to budget for internal thought process (0 disables)
            thinking_budget=-1, 
            # Include the model's internal thoughts in the response (useful for debugging)
            include_thoughts=True, 
        ),
        # Note: candidate_count is currently fixed at 1 for most models/use cases
    )

  print("Layout analysis and transcription process started...")
  prediction, ground_truth = call_gemini(
      INSTRUCTION_PROMPT_SIMPLE, 
      GROUND_TRUTH, model_id="gemini-3-pro-preview", 
      file_paths="Generated Image December 09, 2025 - 4_41PM.jpeg",
      generation_config=generation_config
  )
  # print("Evaluation started...")
  # evaluate_with_gemini(prediction, ground_truth)

if __name__ == "__main__":
    main()
    langfuse.flush()

Layout analysis and transcription process started...
  Uploading file: Generated Image December 09, 2025 - 4_41PM.jpeg...


KeyboardInterrupt: 