In [None]:
import os
from dotenv import load_dotenv
from google import genai
from google.genai import types
import pandas as pd
from datetime import datetime
import sqlite3
import json
from typing import List, Dict, Any, Optional
import re

load_dotenv()

In [None]:
class GeminiProductExtractor:
    """Product and price extractor using Google's Gemini Flash 2.0"""

    def __init__(self, api_key: Optional[str] = None):
        """Initialize the Gemini-based product extractor"""
        # Set up Gemini API
        self.api_key = api_key or os.environ.get("GEMINI_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Google API key is required. Set GOOGLE_API_KEY environment variable or pass api_key parameter.")

        self.client = genai.Client(api_key=self.api_key)
        self.model = "gemini-2.0-flash-thinking-exp"

        # Connect to database
        self.setup_database()

    # Adapter to convert datetime objects to ISO format strings
    def adapt_datetime(dt):
        return dt.isoformat()

    # Converter to parse ISO format strings back to datetime objects
    def convert_datetime(bytestring):
        return datetime.fromisoformat(bytestring.decode())

    # Register the adapter for datetime objects
    sqlite3.register_adapter(datetime, adapt_datetime)

    # Register the converter for 'DATETIME' type
    sqlite3.register_converter('DATETIME', convert_datetime)

    def setup_database(self):
        """Setup SQLite database for storing products"""
        self.conn = sqlite3.connect('products_gemini.db', detect_types=sqlite3.PARSE_DECLTYPES)
        self.cursor = self.conn.cursor()

        # Create products table if it doesn't exist
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY,
            name TEXT,
            price REAL,
            currency TEXT,
            source_text TEXT,
            timestamp DATETIME
        )
        ''')
        self.conn.commit()

    def extract_products(self, text: str) -> List[Dict[str, Any]]:
        """Extract product names and prices from text using Gemini"""
        system_prompt = """
        Extract all products and their prices mentioned in the text.
        Return a JSON array where each item has the following format:
        {
            "name": "Product Name",
            "price": 123.45,
            "currency": "$",
            "quantity": 1
        }

        Rules:
        1. Extract complete product names including brand and model
        2. Convert all prices to numeric values (no currency symbols in the price field)
        3. Identify the currency symbol used ($, €, £, etc.) and include it separately
        4. Return an empty array if no products with prices are detected
        5. Do not make up any information not present in the text
        6. Extract bangla text as well
        7. If quantity is mentioned, include it, otherwise default to 1
        """

        try:
            response = self.client.models.generate_content(
                model=self.model,
                config=types.GenerateContentConfig(
                    system_instruction=system_prompt,
                    max_output_tokens=5000,
                    temperature=0.1
                ),
                contents=[text]
            )

            # Parse the response JSON
            try:
                result = json.loads(response.text)

                # Add source text to each product entry
                for product in result:
                    product["source_text"] = text

                return result
            except json.JSONDecodeError:
                # Fallback if response isn't valid JSON - look for code blocks
                json_match = re.search(r'```json\s*(.*?)\s*```', response.text, re.DOTALL)
                if json_match:
                    try:
                        result = json.loads(json_match.group(1))
                        for product in result:
                            product["source_text"] = text
                        return result
                    except:
                        pass

                print(f"Failed to parse Gemini response as JSON: {response.text}")
                return []
        except Exception as e:
            print(f"Error calling Gemini API: {e}")
            return []

    def store_products(self, products: List[Dict[str, Any]]):
        """Store extracted products in the database"""
        for product in products:
            self.cursor.execute(
                "INSERT INTO products (name, price, currency, source_text, timestamp) VALUES (?, ?, ?, ?, ?)",
                (product["name"],
                 product["price"],
                 product["currency"],
                 product["source_text"],
                 datetime.now())
            )
        self.conn.commit()

    def process_text(self, text: str) -> List[Dict[str, Any]]:
        """Process text to extract and store products"""
        products = self.extract_products(text)
        if products:
            self.store_products(products)
        return products

    def process_voice(self, audio_file_path: str):
        """
        Placeholder for voice processing functionality
        In a real implementation, this would use a speech-to-text API
        then pass the text to process_text()
        """
        print(f"Voice processing not implemented in this demo. Would process: {audio_file_path}")
        return []

    def get_stored_products(self, limit: int = 10) -> List[Dict[str, Any]]:
        """Retrieve stored products from database"""
        self.cursor.execute(
            "SELECT id, name, price, currency, source_text, timestamp FROM products ORDER BY timestamp DESC LIMIT ?",
            (limit,)
        )
        columns = ["id", "name", "price", "currency", "source_text", "timestamp"]
        return [dict(zip(columns, row)) for row in self.cursor.fetchall()]

    def close(self):
        """Close database connection"""
        self.conn.close()

In [None]:
# Demo usage
def run_gemini_demo():
    # Initialize the extractor - make sure to set GOOGLE_API_KEY in your environment
    # or pass it explicitly: GeminiProductExtractor(api_key="your-api-key")
    try:
        extractor = GeminiProductExtractor()

        # Sample texts
        sample_texts = [
            "amare 5kg chal den 1000 taka",
        ]

        # Process each sample text
        all_products = []
        for text in sample_texts:
            print(f"\nProcessing text: {text}")
            products = extractor.process_text(text)
            all_products.extend(products)
            print(f"Extracted products: {json.dumps(products, indent=2)}")

        # Show all stored products
        stored_products = extractor.get_stored_products()
        print("\nStored products in database:")
        for product in stored_products:
            print(
                f"ID: {product['id']} | {product['name']} - {product['currency']}{product['price']} | Source: '{product['source_text'][:30]}...'")

        # Close the connection
        extractor.close()
    except ValueError as e:
        print(f"Error: {e}")
        print("To run this demo, you need to provide a Google API key for Gemini.")
        print("Set it as an environment variable: export GOOGLE_API_KEY=your-key-here")


if __name__ == "__main__":
    run_gemini_demo()