In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import re
import time
import queue
import logging
import threading
from tqdm import tqdm
from collections import deque
from datetime import datetime
from google import genai

In [None]:
# Ensure the output folder exists
os.makedirs("content", exist_ok=True)

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("gemini_geometry_solver.log"),
        logging.StreamHandler()
    ]
)

In [None]:
class GeminiVisionApiManager:
    """
    Manages multiple Gemini API keys with rotation and rate limiting for vision tasks.
    Handles file uploads and content generation with automatic key switching.
    """

    def __init__(self, api_keys, calls_per_day=200, rate_limit_delay=4):
        """
        Initialize the API manager with multiple API keys.

        Args:
            api_keys (list): List of Gemini API keys.
            calls_per_day (int): Maximum number of calls allowed per key per day.
            rate_limit_delay (int): Seconds to wait between API calls (4 seconds for 15 RPM).
        """
        self.api_keys = deque(api_keys)
        self.calls_per_day = calls_per_day
        self.rate_limit_delay = rate_limit_delay

        # Track usage for each key
        self.usage_count = {key: 0 for key in api_keys}
        self.current_key = self.api_keys[0]
        self.client = genai.Client(api_key=self.current_key)

        # Set up a queue for API calls
        self.call_queue = queue.Queue()
        self.worker_thread = threading.Thread(target=self._process_queue)
        self.worker_thread.daemon = True
        self.worker_thread.start()

        # Cache for uploaded files to avoid re-uploading
        self.file_cache = {}

        logging.info(f"Vision API Manager initialized with {len(api_keys)} keys")

    def _rotate_key(self):
        """Rotate to the next available API key."""
        self.api_keys.rotate(1)
        self.current_key = self.api_keys[0]
        self.client = genai.Client(api_key=self.current_key)
        # Clear file cache when rotating keys as files are key-specific
        self.file_cache.clear()
        logging.info(f"Rotated to new API key (usage: {self.usage_count[self.current_key]})")

    def _find_available_key(self):
        """Find an API key that hasn't reached the daily limit."""
        initial_key = self.current_key

        if self.usage_count[self.current_key] < self.calls_per_day:
            return True

        for _ in range(len(self.api_keys)):
            self._rotate_key()
            if self.usage_count[self.current_key] < self.calls_per_day:
                return True
            if self.current_key == initial_key:
                return False

        return False

    def _process_queue(self):
        """Process the queue of API calls."""
        while True:
            try:
                task_type, args, kwargs, result_queue = self.call_queue.get()
                
                if not self._find_available_key():
                    result_queue.put({"error": "All API keys have reached their daily limit"})
                    self.call_queue.task_done()
                    continue

                try:
                    if task_type == "upload":
                        response = self.client.files.upload(**kwargs)
                        result_queue.put({"response": response})
                    elif task_type == "generate":
                        response = self.client.models.generate_content(*args, **kwargs)
                        result_queue.put({"response": response})
                    
                    self.usage_count[self.current_key] += 1
                    
                except Exception as e:
                    if "quota" in str(e).lower() or "rate limit" in str(e).lower():
                        self.usage_count[self.current_key] = self.calls_per_day
                        logging.warning(f"API key reached rate limit: {str(e)}")
                        result_queue.put({"error": f"Rate limit: {str(e)}"})
                    else:
                        logging.error(f"API call error: {str(e)}")
                        result_queue.put({"error": str(e)})
                
                time.sleep(self.rate_limit_delay)
                self.call_queue.task_done()
                
            except Exception as e:
                logging.error(f"Queue processing error: {str(e)}")
                continue

    def upload_file(self, file_path):
        """
        Upload a file to Gemini, with caching to avoid re-uploads.
        
        Args:
            file_path (str): Path to the file to upload
            
        Returns:
            The uploaded file object
        """
        # Use current key + file path as cache key
        cache_key = f"{self.current_key}:{file_path}"
        
        if cache_key in self.file_cache:
            return self.file_cache[cache_key]
        
        result_queue = queue.Queue()
        self.call_queue.put(("upload", [], {"file": file_path}, result_queue))
        result = result_queue.get()
        
        if "error" in result:
            raise Exception(result["error"])
            
        # Cache the uploaded file
        self.file_cache[cache_key] = result["response"]
        return result["response"]

    def generate_content(self, *args, **kwargs):
        """
        Make an API call to generate content, automatically handling key rotation.

        Returns:
            The response from the API call.
        """
        result_queue = queue.Queue()
        self.call_queue.put(("generate", args, kwargs, result_queue))
        result = result_queue.get()
        
        if "error" in result:
            raise Exception(result["error"])
        return result["response"]

    def reset_usage_counts(self):
        """Reset the usage counts for all keys (e.g., at the start of a new day)."""
        self.usage_count = {key: 0 for key in self.api_keys}
        self.file_cache.clear()  # Clear file cache when resetting
        logging.info("Reset API key usage counts and file cache")

    def get_usage_stats(self):
        """Get usage statistics for all keys."""
        total_used = sum(self.usage_count.values())
        total_available = len(self.api_keys) * self.calls_per_day
        return {
            "per_key": self.usage_count,
            "total_used": total_used,
            "total_available": total_available,
            "percent_used": (total_used / total_available) * 100 if total_available > 0 else 0
        }

In [None]:
def solve_geometry_problem_with_gemini(api_manager, image_path, question, choices):
    """
    Solve geometry problem using Gemini Vision with step-by-step reasoning

    Args:
        api_manager: GeminiVisionApiManager instance
        image_path (str): Path to the geometry problem image
        question (str): Question to solve
        choices (str): Multiple choice options (A, B, C, D)

    Returns:
        dict: Contains 'content'
    """
    try:
        # Upload the image file
        uploaded_file = api_manager.upload_file(image_path)
        
        # Enhanced prompt to force choice selection with vision
        prompt = f"""You are an AI mathematician solving geometric problems through visual analysis and logical deduction.

GEOMETRY PROBLEM IMAGE:
The image shows a geometric figure with various shapes, lines, angles, and measurements. Analyze this image carefully to understand the geometric relationships and constraints.

QUESTION:
{question}

ANSWER CHOICES:
{choices}

SOLUTION STEPS:
1. Carefully examine the geometric figure in the image and identify all the given information
2. Understand what measurements, angles, or relationships are shown
3. Identify what needs to be found based on the question
4. Apply relevant geometric theorems, properties, and formulas step by step
5. Show all your calculations clearly
6. Compare your calculated result with the given answer choices
7. Even if your calculated answer doesn't exactly match any choice, select the closest one

⚠️ CRITICAL OUTPUT FORMAT REQUIREMENT ⚠️
YOU MUST END YOUR RESPONSE WITH EXACTLY ONE OF THESE FOUR LINES:
Final Answer: A
Final Answer: B
Final Answer: C
Final Answer: D

❌ ABSOLUTELY FORBIDDEN - DO NOT USE:
- "The final answer is $\\boxed{{14}}$"
- "The final answer is $\\boxed{{A}}$"
- "$\\boxed{{A}}$"
- "\\boxed{{A}}"
- "(A)"
- "A is correct."
- "Final Answer: The answer is A"
- Any LaTeX formatting
- Any mathematical notation
- Any additional text after the letter

✅ REQUIRED FORMAT EXAMPLES:
If you determine the answer is choice A: "Final Answer: A"
If you determine the answer is choice B: "Final Answer: B"
If you determine the answer is choice C: "Final Answer: C"
If you determine the answer is choice D: "Final Answer: D"

IMPORTANT: Your response must end with exactly "Final Answer: [SINGLE LETTER]" - nothing else on that line. Do not include any boxed notation, LaTeX, or mathematical formatting in your final line.

Begin your analysis now and remember to end with the exact required format."""

        # Generate content with the uploaded image and prompt
        response = api_manager.generate_content(
            model="gemini-2.0-flash",
            contents=[uploaded_file, prompt]
        )

        return {
            'content': response.text.strip()
        }
        
    except Exception as e:
        logging.error(f"Error processing image {image_path}: {e}")
        return {'content': f'Error processing image: {str(e)}'}

In [None]:
def extract_answer_letter(content):
    """
    Enhanced function to extract an answer letter from a model's output.
    It includes patterns for plain text, markdown, and LaTeX formats to ensure
    the answer is captured reliably.
    Args:
        content (str): The model's output content.
    Returns:
        str: The extracted answer letter (A, B, C, or D), or an empty string if not found.
    """
    # Enhanced list of regex patterns to try, in order of preference
    patterns = [
        # LaTeX box patterns - FIXED PATTERNS
        r"\\boxed\{([A-D])\}",                # Handles '\boxed{A}' or '$\boxed{A}$'
        r"\$\\boxed\{([A-D])\}\$",            # Handles '$\boxed{A}$'
        r"\\boxed\{\\text\{([A-D])\}\}",      # Handles '\boxed{\text{A}}'
        r"\$\\boxed\{\\text\{([A-D])\}\}\$",  # Handles '$\boxed{\text{A}}$'
        r"The final answer is \$\\boxed\{([A-D])\}\$",  # 'The final answer is $\boxed{A}$'
        r"The final answer is \\boxed\{([A-D])\}",      # 'The final answer is \boxed{A}'
        r"The final answer is \$\\boxed\{(\d+)\}\$",    # Extract from numeric boxed answers
        
        # Standard patterns
        r"Final Answer:\s*([A-D])\b",        # Final Answer: A
        r"Final Answer:\s*\*\*([A-D])\*\*",  # Final Answer: **A**
        r"Answer:\s*([A-D])\b",              # Answer: A
        r"Answer:\s*\*\*([A-D])\*\*",        # Answer: **A**
        r"Answer:\s*\*([A-D])\*",            # Answer: *A*
        r"Answer:\s*_([A-D])_",              # Answer: _A_
        r"Answer:\s*\(([A-D])\)",            # Answer: (A)
        r"Answer:\s*([A-D])\.",              # Answer: A.
        
        # Sentence-based patterns
        r"The answer is\s*([A-D])\b",        # The answer is A
        r"The correct answer is\s*([A-D])\b", # The correct answer is A
        r"\b([A-D])\s*is the correct",       # A is the correct
        
        # Choice/option patterns
        r"choice\s*([A-D])\b",               # choice A
        r"option\s*([A-D])\b",               # option A
        r"select\s*([A-D])\b",               # select A
        r"choose\s*([A-D])\b",               # choose A
        
        # Concluding word patterns
        r"Therefore,?\s*([A-D])\b",          # Therefore, A
        r"Thus,?\s*([A-D])\b",               # Thus, A
        r"Hence,?\s*([A-D])\b",              # Hence, A
    ]
    
    # Try each pattern in the defined order
    for pattern in patterns:
        match = re.search(pattern, content, re.IGNORECASE)
        if match:
            captured = match.group(1).upper()
            # Handle numeric answers by mapping to choices if needed
            if captured.isdigit():
                # You might need to implement logic here to map numbers to letters
                # based on your specific answer choices
                continue
            return captured
    
    # Special handling for boxed numeric answers like "The final answer is $\boxed{14}$"
    # Try to match the numeric value with your answer choices
    numeric_boxed = re.search(r"\\boxed\{([0-9.]+)\}", content)
    if numeric_boxed:
        numeric_value = numeric_boxed.group(1)
        # You would need to compare this with your actual answer choices
        # and return the corresponding letter
        # For now, we'll continue to other patterns
        pass
    
    # If no specific pattern matches, look for isolated letters near the end
    lines = content.strip().split('\n')
    for line in reversed(lines[-10:]):  # Check the last 10 lines
        line = line.strip()
        if line in ['A', 'B', 'C', 'D']:
            return line
        # Check if a line contains only one of the possible answer letters
        letters_found = re.findall(r'\b([A-D])\b', line)
        if len(letters_found) == 1:
            return letters_found[0].upper()
    
    # As a last resort, find any occurrence of A, B, C, or D in the content
    all_letters = re.findall(r'\b([A-D])\b', content)
    if all_letters:
        # Return the last one found, as it's most likely the final answer
        return all_letters[-1].upper()
    
    return ""

# Additional helper function to map numeric answers to letters if needed
def map_numeric_to_letter(numeric_value, answer_choices):
    """
    Map a numeric value to the corresponding letter choice.
    Args:
        numeric_value (str): The numeric value extracted
        answer_choices (dict): Dictionary mapping letters to values
    Returns:
        str: The corresponding letter, or empty string if no match
    """
    try:
        num_val = float(numeric_value)
        for letter, choice_value in answer_choices.items():
            if isinstance(choice_value, (int, float)) and abs(float(choice_value) - num_val) < 0.01:
                return letter
    except (ValueError, TypeError):
        pass
    return ""

In [None]:
def validate_and_retry_if_needed(api_manager, image_path, question, choices, max_retries=3):
    """
    Try to get a valid answer letter, with retries if needed.
    """
    for attempt in range(max_retries):
        result = solve_geometry_problem_with_gemini(api_manager, image_path, question, choices)
        content = result['content']
        answer_letter = extract_answer_letter(content)
        
        if answer_letter in ['A', 'B', 'C', 'D']:
            return content, answer_letter
        
        print(f"Attempt {attempt + 1} failed to extract valid answer letter")
        logging.warning(f"Attempt {attempt + 1} failed for {image_path}: No valid answer extracted")
    
    # If all attempts fail, try one more time with a very direct prompt
    try:
        uploaded_file = api_manager.upload_file(image_path)
        direct_prompt = f"""Look at this geometry problem image and answer the question.

QUESTION: {question}
CHOICES: {choices}

⚠️ CRITICAL OUTPUT FORMAT REQUIREMENT ⚠️
YOU MUST END YOUR RESPONSE WITH EXACTLY ONE OF THESE FOUR LINES:
Final Answer: A
Final Answer: B
Final Answer: C
Final Answer: D

❌ ABSOLUTELY FORBIDDEN - DO NOT USE:
- "The final answer is $\\boxed{{14}}$"
- "The final answer is $\\boxed{{A}}$"
- "$\\boxed{{A}}$"
- "\\boxed{{A}}"
- "(A)"
- "A is correct."
- "Final Answer: The answer is A"
- Any LaTeX formatting
- Any mathematical notation
- Any additional text after the letter

✅ REQUIRED FORMAT EXAMPLES:
If you determine the answer is choice A: "Final Answer: A"
If you determine the answer is choice B: "Final Answer: B"
If you determine the answer is choice C: "Final Answer: C"
If you determine the answer is choice D: "Final Answer: D"

IMPORTANT: Your response must end with exactly "Final Answer: [SINGLE LETTER]" - nothing else on that line. Do not include any boxed notation, LaTeX, or mathematical formatting in your final line.

Begin your analysis now, and remember to end with the exact required format.
"""
        
        response = api_manager.generate_content(
            model="gemini-2.0-flash",
            contents=[uploaded_file, direct_prompt]
        )
        
        final_content = response.text.strip()
        final_letter = extract_answer_letter(final_content)
        
        return final_content, final_letter if final_letter in ['A', 'B', 'C', 'D'] else 'A'  # Default to A if still fails
    
    except Exception as e:
        logging.error(f"Error in final retry for {image_path}: {e}")
        return "Error occurred", 'A'

In [None]:
def main():
    """Main function to process geometry problems using Gemini Vision API."""
    
    # Initialize API keys
    api_keys = [
        "your api key"
    ]

    # Initialize API manager
    api_manager = GeminiVisionApiManager(
        api_keys=api_keys,
        calls_per_day=200,
        rate_limit_delay=4  # 4 seconds to stay under 15 RPM
    )

    # Input directories
    image_dir = "image"
    questions_dir = "question"
    choices_dir = "choices"
    
    # Output directories
    reasoning_output_dir = "/kaggle/working/reasoning_output"
    answer_literal_dir = "/kaggle/working/answer_literal_gemini_vision"
    os.makedirs(reasoning_output_dir, exist_ok=True)
    os.makedirs(answer_literal_dir, exist_ok=True)
    
    logging.info("Starting Gemini Vision geometry problem solving...")
    
    # Process problems from 2750 to 2799 (inclusive)
    for num in tqdm(range(2401, 3002), desc="Processing problems"):
        num_str = str(num)
        
        # Paths to input files
        image_path = os.path.join(image_dir, f"{num_str}.png")
        ques_path = os.path.join(questions_dir, f"{num_str}.txt")
        choice_path = os.path.join(choices_dir, f"{num_str}.txt")
        
        # Check if all required files exist
        if not all(os.path.exists(path) for path in [image_path, ques_path, choice_path]):
            print(f"Skipping problem {num_str}: Missing input files")
            logging.warning(f"Skipping problem {num_str}: Missing input files")
            continue
        
        # Read inputs
        try:
            with open(ques_path, "r", encoding='utf-8') as f:
                question = f.read().strip()
            with open(choice_path, "r", encoding='utf-8') as f:
                choices = f.read().strip()
        except Exception as e:
            print(f"Error reading files for problem {num_str}: {e}")
            logging.error(f"Error reading files for problem {num_str}: {e}")
            continue
        
        # Solve with validation and retry mechanism
        try:
            content, answer_letter = validate_and_retry_if_needed(
                api_manager, image_path, question, choices
            )
            
            # Build the reasoning file content
            reasoning_lines = []
            reasoning_lines.append("=" * 100)
            reasoning_lines.append("PROBLEM DETAILS:")
            reasoning_lines.append("=" * 100)
            reasoning_lines.append(f"IMAGE: {image_path}")
            reasoning_lines.append("")
            reasoning_lines.append(f"QUESTION:\n{question}")
            reasoning_lines.append("")
            reasoning_lines.append(f"CHOICES:\n{choices}")
            reasoning_lines.append("")
            
            reasoning_lines.append("=" * 100)
            reasoning_lines.append("GEMINI VISION MODEL RESPONSE:")
            reasoning_lines.append("=" * 100)
            reasoning_lines.append(content)
            reasoning_lines.append("")
            reasoning_lines.append("=" * 100)
            reasoning_lines.append(f"EXTRACTED ANSWER: {answer_letter}")
            reasoning_lines.append("=" * 100)
            
            reasoning_output = "\n".join(reasoning_lines)
            
            # Write reasoning output to file
            reasoning_out_path = os.path.join(reasoning_output_dir, f"{num_str}.txt")
            with open(reasoning_out_path, "w", encoding='utf-8') as f:
                f.write(reasoning_output)
            
            # Validate answer letter
            if not answer_letter or answer_letter not in ['A', 'B', 'C', 'D']:
                print(f"Warning: Invalid answer letter '{answer_letter}' for problem {num_str}")
                logging.warning(f"Invalid answer letter '{answer_letter}' for problem {num_str}")
                # Force a default answer rather than empty
                answer_letter = 'A'  # Default fallback
            
            # Write just the answer letter to a separate file
            letter_out_path = os.path.join(answer_literal_dir, f"{num_str}.txt")
            with open(letter_out_path, "w", encoding='utf-8') as f:
                f.write(answer_letter)
            
            print(f"Problem {num_str}: Answer = {answer_letter}")
            logging.info(f"Problem {num_str}: Answer = {answer_letter}")
            
            # Log API usage every 10 problems
            if num % 10 == 0:
                stats = api_manager.get_usage_stats()
                logging.info(f"API Usage: {stats['total_used']}/{stats['total_available']} calls ({stats['percent_used']:.2f}%)")
                
        except Exception as e:
            print(f"Error processing problem {num_str}: {e}")
            logging.error(f"Error processing problem {num_str}: {e}")
            
            # Write error to files
            error_content = f"Error processing problem: {str(e)}"
            
            reasoning_out_path = os.path.join(reasoning_output_dir, f"{num_str}.txt")
            with open(reasoning_out_path, "w", encoding='utf-8') as f:
                f.write(error_content)
            
            letter_out_path = os.path.join(answer_literal_dir, f"{num_str}.txt")
            with open(letter_out_path, "w", encoding='utf-8') as f:
                f.write('A')  # Default answer on error
            
            continue
    
    # Final usage statistics
    final_stats = api_manager.get_usage_stats()
    logging.info("Processing completed!")
    logging.info(f"Final API Usage: {final_stats['total_used']}/{final_stats['total_available']} calls ({final_stats['percent_used']:.2f}%)")
    print("Processing completed!")
    print(f"Final API Usage: {final_stats['total_used']}/{final_stats['total_available']} calls ({final_stats['percent_used']:.2f}%)")

In [None]:
if __name__ == "__main__":
    main()