In [1]:
import os, time, random, logging
from typing import List, Dict, Optional
import requests
from dotenv import load_dotenv
from gen_utils.parsing_utils import retrieve_secret
import snowflake
import snowflake.connector
import pandas as pd
from langchain_core.tools import tool
from Models.gemini_model import GeminiModel
from pydantic import BaseModel
import requests, re
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from collections import deque
import textwrap


In [None]:

def wrapped_print(text: str, width: int = 100):
    print(textwrap.fill(text, width=width))


@tool
def scrape_google_snippet_urls(query: str) -> str:
    """
    Given a UPC and product name, does a Google search for "<UPC> <product_name> size"
    and returns the first result.
    Args:
        query (str): The search query, typically a UPC and product name.
    Returns:
        str: The results from the Google search.
    """
    retrieve_secret("generalized-parser-des","cd-ds-384118")

    # Load environment variables from .env file
    load_dotenv(override=True)

    # Set up logging
    logger = logging.getLogger("app_logger")

    # Set up Google Custom Search API credentials
    CUSTOM_SEARCH_URL = os.getenv("CUSTOM_SEARCH_URL")       # e.g. https://customsearch.googleapis.com/customsearch/v1
    CUSTOM_SEARCH_API = os.getenv("CUSTOM_SEARCH_API")       # API key
    SEARCH_ENGINE_ID   = os.getenv("SEARCH_ENGINE_ID")       # CX id

    class GoogleSearchError(RuntimeError):
        pass

    class GoogleScraper:
        def __init__(self, sleep_min: float = 2.0, sleep_max: float = 6.0) -> None:
            self.sleep_min = sleep_min
            self.sleep_max = sleep_max

        def search(self, query: str, num_results: int = 5) -> List[Dict[str, str]]:
            """
            Perform a Google search using the Custom Search API.

            Args:
                query (str): The search query.
                num_results (int): Number of results to return.
            Returns:
                List[Dict[str, str]]: List of search results, each containing title, link, and snippet.
            """
            time.sleep(random.uniform(self.sleep_min, self.sleep_max))
            params = {
                "key": CUSTOM_SEARCH_API,
                "cx":  SEARCH_ENGINE_ID,
                "q":   query,
                "num": num_results,
            }
            r = requests.get(CUSTOM_SEARCH_URL, params=params, timeout=30)
            if r.status_code != 200:
                raise GoogleSearchError(f"{r.status_code}: {r.text[:200]}")
            items = r.json().get("items", [])
            return [
                {
                    "title":    it.get("title", ""),
                    "link":     it.get("link", ""),
                    "snippet":  it.get("snippet", ""),
                }
                for it in items
            ]


    scraper_instance = GoogleScraper()

    return str(scraper_instance.search(query))



@tool
def scrape_website_text(start_url: str, same_domain_only: bool = True, timeout: int = 10) -> str:
    """
    Crawl `start_url` (depth-first, same page only) and extract all visible text.

    Parameters
    ----------
    start_url : str
        The page to fetch.
    same_domain_only : bool, default True
        If True, ignore links that point to a different domain.
    timeout : int, default 10
        Seconds to wait for HTTP requests.

    Returns
    -------
    str
        Concatenated visible text from the page.
    """
    try:
        def _visible_text(html: str) -> str:
            soup = BeautifulSoup(html, "html.parser")

            # remove unwanted nodes
            for tag in soup(["script", "style", "noscript", "header",
                            "footer", "svg", "meta", "link",
                            "iframe", "nav", "form"]):
                tag.decompose()

            # get text, strip whitespace, collapse runs of spaces
            text = " ".join(soup.stripped_strings)
            return re.sub(r"\s+", " ", text)

        # --- fetch page --------------------------------------------------------
        try:
            resp = requests.get(start_url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"})
            resp.raise_for_status()
        except requests.RequestException as exc:
            raise RuntimeError(f"Failed to fetch {start_url}: {exc}") from exc

        page_text = _visible_text(resp.text)

        # optionally recurse over internal links ↓ (comment out if not needed)
        domain = urlparse(start_url).netloc
        seen, stack = {start_url}, deque()

        soup = BeautifulSoup(resp.text, "html.parser")
        for link in soup.find_all("a", href=True):
            url = urljoin(start_url, link["href"])
            if url in seen:
                continue
            if same_domain_only and urlparse(url).netloc != domain:
                continue
            stack.append(url)
            seen.add(url)

        while stack:
            url = stack.pop()
            try:
                r = requests.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"})
                r.raise_for_status()
                page_text += " " + _visible_text(r.text)
            except requests.RequestException:
                continue  # skip unreachable / 404 pages

        return page_text
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def parse_google_snippet_urls(url_text: str) -> List[str]:
    """ 
    Extract a list of urls to scrape from the input google snippets.
    Args:
        url_text (str): The text containing the Google snippets.
    Returns:
        List[str]: A list of URLs extracted from the Google snippets.
    """

    model = GeminiModel()

    class GeminiModelResponse(BaseModel):
        urls: List[str]


    system_instruction = f"""

    Extract a list of urls to scrape from the input google snippets.

    """

    user_instruction = f"Google Snippets: {url_text}"

    output = model.generate_response(
        system_instruction,
        user_instruction,
        GeminiModelResponse,
        response_format_flag = True  
    )

    return output['urls']

@tool
def parse_recipe(scraped_text: str) -> dict:
    """
    Parse the scraped text to extract ingredients and recipe instructions.

    Args:
        scraped_text (str): The scraped text from a website or package label.

    Returns:
        str: The parsed recipe with ingredients and instructions.
    """
    # Initialize the Gemini model

    model = GeminiModel()

    class GeminiModelResponse(BaseModel):
        recipe_name: str
        ingredients: str
        recipe: str
        url: str


    system_instruction = f"""

    ** S Ingredient Extraction**

    You are an expert food ingredient extraction assistant.
    Your task is to 
    1. extract a list of **ingredients with quantities and proportions** from raw scraped product text (such as from a website or package label).
    2. extract and summarize the recipe into a nice text paragraph that is simple and easy to understand/follow.

    ### Instructions

    * Read the full input text carefully.
    * Extract **only the ingredient list**.
    * If **amounts or proportions** are specified (e.g. "1 tsp", "10%"), include them.
    * If there is **no quantity**, include the ingredient as-is.
    * Return the ingredients in **markdown format**, as a **bulleted list**.
    * Each line should be of the form: `- ingredient name (quantity)`

    ### Output Format (Markdown)

    * Return the ingredients in **markdown format**, as a **bulleted list** 
    - Each line should be of the form: `- (quantity) (unit) (ingredient name) (preparation method (optional))`

    example: 
    ```
    - 1 tsp. anchovies mashed
    - 2 cloves of garlic mashed
    - 1 tsp. dijon 
    ```

    * Return the recipe as a text paragraph with commas separating the steps.

    example: 
    ```
    Take crisp romaine lettuce, tossed in a creamy dressing made from egg yolk, Dijon mustard, lemon juice, 
    Worcestershire sauce, garlic, and olive oil. Grated Parmesan cheese adds a salty richness, while freshly
    ground black pepper enhances the flavor. Top it with crunchy croutons for texture and a little extra cheese 
    if desired. Serve immediately for the freshest taste.
    ```

    ### Rules

    * Do not add commentary or explanation.
    * If no ingredients are found, return an empty markdown list.

    ---

    Would you like me to generate few-shot examples for this too?


    """

    user_instruction = f"Scraped Text: {scraped_text}"


    output = model.generate_response(
        system_instruction,
        user_instruction,
        GeminiModelResponse,
        response_format_flag = True  
    )

    return output.model_dumps()

In [5]:

# sun_dried = 'https://www.themediterraneandish.com/sun-dried-tomato-chicken/'

# sausage_potato_kale_soup = 'https://www.allrecipes.com/recipe/231287/sausage-potato-and-kale-soup/'

# input_url = sausage_potato_kale_soup

url_text = scrape_google_snippet_urls('Best keylime pie recipes') 

input_urls = parse_google_snippet_urls(url_text)

scraped_text = scrape_website_text(input_urls[0])

output = parse_recipe(scraped_text)

RuntimeError: Failed to fetch https://www.thekitchn.com/key-lime-pie-recipe-showdown-23568483: 403 Client Error: Forbidden for url: https://www.thekitchn.com/key-lime-pie-recipe-showdown-23568483

In [6]:
print(output['recipe_name'], '\n')
print(output['ingredients'], '\n')

print("Website: ", output['url'], '\n')


wrapped_print(output['recipe'])

NameError: name 'output' is not defined