<a href="https://colab.research.google.com/github/Alexrosulek/Cs50/blob/main/gen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!curl https://ollama.ai/install.sh | sh

!echo 'debconf debconf/frontend select Noninteractive' | sudo debconf-set-selections
!sudo apt-get update && sudo apt-get install -y cuda-drivers

import os

# Set LD_LIBRARY_PATH so the system NVIDIA library
os.environ.update({'LD_LIBRARY_PATH': '/usr/lib64-nvidia'})
!nohup ollama serve &
!pip install pyOpenSSL==24.2.1


# Pull Ollama Models
!ollama pull gemma3:1b
!ollama pull gemma3:4b
# Install Packages
!pip install -q ollama crawl4ai aiohttp pillow beautifulsoup4 wikipedia googlesearch-python playwright nest_asyncio
!playwright install chromium
!nohup ollama serve &






  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 13281    0 13281    0     0  41269      0 --:--:-- --:--:-- --:--:-- 41245
>>> Cleaning up old version at /usr/local/lib/ollama
>>> Installing ollama to /usr/local
>>> Downloading Linux amd64 bundle
######################################################################## 100.0%
>>> Adding ollama user to render group...
>>> Adding ollama user to video group...
>>> Adding current user to ollama group...
>>> Creating ollama systemd service...
>>> The Ollama API is now available at 127.0.0.1:11434.
>>> Install complete. Run "ollama" from the command line.
Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:4 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:5

In [None]:
# Imports
import nest_asyncio
import asyncio
import random
import time
import aiohttp
import shutil
import os
import re
import base64
import io
import subprocess
import requests
import textwrap
from PIL import Image
from bs4 import BeautifulSoup
import wikipedia

from urllib.parse import urlparse
from googlesearch import search

# Crawl4AI Imports (Valid Ones)
from crawl4ai import LLMConfig, LLMExtractionStrategy, CrawlerRunConfig
from crawl4ai.deep_crawling.filters import FilterChain, ContentRelevanceFilter

from crawl4ai.deep_crawling import BFSDeepCrawlStrategy
import json
from crawl4ai.extraction_strategy import CosineStrategy

from crawl4ai.async_configs import BrowserConfig

from crawl4ai import AsyncWebCrawler
from crawl4ai.async_configs import BrowserConfig


print("✅ Environment Fully Ready!")

API_BASE = "https://www.nearestdoor.com"  # Replace with actual server URL
CLIENT_ID = "client001"
HEARTBEAT_INTERVAL = 60  # seconds
SHOP_FLOW_STATIC = [
    "search", "aggregate", "createplan", "create",
    "find_available_fields", "extract_fields_from_aggregate", "fillintheshop"
]
# --------------------------------------------------------------------------- #
# 🧠  LIGHT‑WEIGHT LOCAL LLM EXECUTION                                       #
# --------------------------------------------------------------------------- #
class OllamaRunner:
    """
    `ollama run …`

    """
    def __init__(self, default_model: str = "gemma3:1b", default_timeout: int = 300):
        self.default_model = default_model
        self.default_timeout = default_timeout

    def run(self, prompt: str, model: str | None = None, timeout: int | None = None) -> str:
        model = model or self.default_model
        timeout = timeout or self.default_timeout
        print(f"🧠 Running Ollama: {model}")

        try:
            proc = subprocess.run(
                ["ollama", "run", model],
                input=prompt.encode("utf-8"),
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                timeout=timeout,
            )
            return proc.stdout.decode("utf-8").strip()
        except Exception as e:
            print(f"❌ Ollama execution failed: {e}")
            return ""


# --------------------------------------------------------------------------- #
# 🌐  LOOK‑UP ENGINE                                                          #
# --------------------------------------------------------------------------- #
class LookupEngine:
    """
    – Validates every URL first
    – Google results exclude Yelp & Reddit and are content‑checked
    – Yelp & Reddit results are *also* content‑checked before ‘battling’
    – At most one Yelp URL & one Reddit URL are returned
    – Wikipedia returns at most one page (auto_suggest)
    """
    def __init__(self, crawler, ollama_runner: OllamaRunner | None = None):
        self.crawler = crawler
        self.llm_config = LLMConfig(provider="ollama/gemma3:1b")
        self.ollama = ollama_runner or OllamaRunner()
        self.crawler_manager = self.CrawlerManager()
    async def initialize(self):
        await self.crawler_manager.start()


    # ---------------------  LOW‑LEVEL HELPERS  ----------------------------- #
    class CrawlerManager:
        def __init__(self):
            self.crawler = None
        async def start(self):
            if self.crawler is None:
                self.crawler = AsyncWebCrawler(config=BrowserConfig())
                await self.crawler.__aenter__()

        async def stop(self):
            if self.crawler:
                await self.crawler.__aexit__(None, None, None)
                self.crawler = None

        async def crawl(self, url: str, config: CrawlerRunConfig | None = None):
            if not self.crawler:
                raise RuntimeError("Crawler not started")

            url = self._normalize_and_validate_url(url)
            if not url:
                return None

            try:
                result = await self.crawler.arun(url=url, config=config or CrawlerRunConfig())
                return result
            except Exception as e:
                print(f"❌ crawl error for {url}: {e}")
                return None

        def _normalize_and_validate_url(self, url: str) -> str | None:
            try:
                url = url.lower()
                if not url.startswith(("http://", "https://")):

                    url = "https://" + url

                parsed = urlparse(url)
                if parsed.scheme not in ["http", "https"] or not parsed.netloc or "." not in parsed.netloc or " " in parsed.netloc or "/http" in parsed.netloc:
                    return None

                return url
            except Exception as e:
                print(f"❌ URL normalization failed: {e}")
                return None


    async def _extract_snippet(self, url: str, max_chars: int, min_chars: int) -> str | None:
        try:
            resp = requests.get(
                url, timeout=10,
                headers={"User-Agent": "Mozilla/5.0 (compatible; LookupEngine/1.0)"}
            )
            if resp.status_code != 200:
                return None

            soup = BeautifulSoup(resp.text, "html.parser")
            parts: list[str] = []

            if soup.title:
                parts.append("Meta Title: "+ soup.title.string.strip())

            desc = soup.find("meta", attrs={"name": "description"})
            if desc and desc.get("content"):
                parts.append("Meta Desc: "+ desc["content"].strip())

            heads = [h.get_text(strip=True) for h in soup.find_all(["h1", "h2", "h3"])][:3]
            parts.extend(["Headers: " + h for h in heads])


            para = next((p.get_text(strip=True) for p in soup.find_all("p") if len(p.get_text(strip=True)) > 50), "")
            if para:
                parts.append("Paragraph" + para)
            if len("".join(parts)) < min_chars:
                return None
            return f"Snippet From {url}:\n" + "\n".join(parts)[:max_chars]


        except Exception as e:
            print(f"❌ Snippet extraction failed for {url}: {e}")
            return None


    async def _basic_url_checker(self, url: str, shop_name: str, shop_type: str) -> bool:
        try:

            """
            Improved relevance check for a URL:
            1. Uses Crawl4AI's ContentRelevanceFilter for semantic similarity.
            2. Falls back to LLM prompt decision if semantic check is inconclusive.
            """

            if not url:
                return False

            # Step 1: Semantic Query Based on Shop Type
            semantic_query = self.get_semantic_query(shop_type, shop_name)
            relevance_filter = ContentRelevanceFilter(
            query=semantic_query,
            threshold=0.4
        )

            crawl_config = CrawlerRunConfig(
                deep_crawl_strategy=BFSDeepCrawlStrategy(
                    max_depth=1,
                    filter_chain=FilterChain([relevance_filter])
                ),
                word_count_threshold=20,
                excluded_tags=["script", "style", "footer", "nav"],
                exclude_social_media_links=True,
                exclude_external_links=True
            )


            url = self.crawler_manager._normalize_and_validate_url(url)



            if not url:
                return False
            result = await self.crawler_manager.crawl(url, config=crawl_config)


            if result.success and result.extracted_content:
                    # Passed semantic filtering — consider it a valid URL
                print(f"✅ Semantic check passed for: {url}")
                return True


            # Step 2: Fallback — Quick Snippet and LLM Yes/No Decision
            snippet = await self._extract_snippet(url, 500, 100)
            if not snippet:
                return False

            prompt = (
                f"Is the following content about the {shop_type} named '{shop_name}'? "
                f"Answer only `true` or `false` or `none`.\n\n{snippet}"
            )

            decision = self.ollama.run(prompt)
            if "true" in decision.lower():
                print(f"✅ LLM confirmed relevance for: {url}")
                return True

            print(f"⚠️ URL deemed irrelevant: {url}")
            return False

        except Exception as e:
            print(f"error in basicc checker", {url}, {shop_name}, {shop_type}, e)
            return False
    @staticmethod
    def get_semantic_query(shop_type, shop_name):
        queries = {
            "church": f"{shop_name}, history, review, hours, muslim, phone, church, christian, church events, holiday schedules, mass times, sermons, church history, community programs, accessibility options, FAQs, donation methods, parking, contact information",
            "plasma_center": f"{shop_name}, history, review, stocked brands, review, hours, phone, plasma, plasma donation requirements, compensation rates, donor reward, donor eligibility, contact details, operating hours, health guidelines, FAQ, appointment scheduling, safety procedures",
            "thrift_store": f"{shop_name}, history, review, stocked brands, second hand,  review, hours, phone, thrift, store hours, donation guidelines, accepted items, discounts, sales events, store history, accessibility, contact info, volunteer programs, reviews",
            "dog_park": f"{shop_name}, history, review, water, shade, agility equipment, park, review, hours, phone, dog, dog park hours, leash rules, pet-friendly areas, dog-friendly facilities, park amenities, accessibility options, entry fees, safety tips, events, pet policies, reviews",
        }
        return queries.get(shop_type.lower(), "business information, contact details, operating hours, reviews, FAQs, history")

    async def _get_site_content(

            self,
            url: str,
            shop_name: str,
            shop_type: str,

            ) -> str | None:
        try:
        # Step 1: Semantic Filter Based on Shop Type
            semantic_query = self.get_semantic_query(shop_type, shop_name)

            cosine_strategy = CosineStrategy(
                semantic_filter=semantic_query,
                word_count_threshold=20,  # Filter short content blocks early
                sim_threshold=0.35,       # Loose enough to capture diverse relevant content
                top_k=8,                  # Top 5 most relevant content clusters
                model_name="sentence-transformers/all-MiniLM-L6-v2",
                verbose=True
            )

            crawl_config = CrawlerRunConfig(
                extraction_strategy=cosine_strategy,
                excluded_tags=["script", "style", "header", "footer", "nav"],
                exclude_external_links=False,
                exclude_social_media_links=False,
                word_count_threshold=20,
                process_iframes=True,
                remove_overlay_elements=True
            )

            # ✅ Use CrawlerManager properly
            url = self.crawler_manager._normalize_and_validate_url(url)
            if not url:
                return None

            result = await self.crawler_manager.crawl(url, config=crawl_config)

            if not result or not result.success or not result.extracted_content:
                print(f"❌ Semantic content extraction failed for {url}")
                return None

            # ✅ Extract content safelytry:
            try:
                extracted_data = json.loads(result.extracted_content)
            except json.JSONDecodeError:
                print(f"❌ Invalid JSON from {url}")
                return None

            if not extracted_data:
                print(f"⚠️ No content extracted using semantic clustering for {url}")
                return None

            content_chunks = [item["content"] for item in extracted_data]
            combined_content = "\n\n".join(content_chunks)

            return combined_content


        except Exception as e:
            print(f"❌ Failed to parse site content: {e}")
            return None
    # ---------------------  LOOK‑UP ROUTINES  ------------------------------ #
    async def wikipedia_lookup(self, name: str, city: str, shop_type: str) -> str | None:
        try:
            query = f"{name} {city} {shop_type}".strip()
            print(f"📚 Wikipedia lookup → {query}")
            page = wikipedia.page(query, auto_suggest=True)
            content = page.content

            if len(content) <= 2000:
                return content
            chunks = [content[i:i + 500] for i in range(0, len(content), 500)]
            # Fallback-safe middle extraction
            if len(chunks) > 6:
                middle = chunks[2:-2]  # Remove first and last 2 chunks
                if not middle:
                    middle = chunks  # If middle is empty, fallback to all chunks
            else:
                middle = chunks

            # Intelligent selection
            if len(middle) > 6:
                selected = random.sample(middle, 6)  # Randomly select 6 if too many
            else:
                selected = middle  # Take all available if 6 or fewer
            formatted_chunks = [f"\nWIKI CHUNK {idx + 1}:\n{chunk}" for idx, chunk in enumerate(selected)]

            return f"ALL EXTRACTED WIKIPEDIA SEARCH INFO FOR {name}:\n" + "\n".join(formatted_chunks)

        except Exception as e:
            print(f"❌ Wikipedia fetch failed: {e}")
            return None

    async def search_lookup(self,  name: str,  shop_type: str, query: str, placename: str, amount: int, isyelp: bool=False) -> str | None:
        try:
            print(f"🔎 Google search → {query}")

            raw = list(search(query, amount))
            if not isyelp:
                candidate_urls = [u for u in raw if "yelp" not in u.lower() and "reddit" not in u.lower()]
            else:

                candidate_urls = [u for u in raw if "yelp" in u.lower() or "reddit" in u.lower()]
            good_content = []
            for i, url in enumerate(candidate_urls):


                if not url:
                    continue
                if not await self._basic_url_checker(url, name, shop_type):
                    continue

                content = await self._get_site_content(url, name, shop_type)
                if content:
                    good_content.append(f"\n← {placename} SEARCH DATA SITE {i} FROM: {url}\n {content}")

            return f"ALL EXTRACTED {placename} SEARCH DATA FOR {name}\n".join(good_content) if good_content else None
        except Exception as e:
            print(f"❌ Failed to parse search lookup results: {e}")
            return None
    # ---------------------  PUBLIC ENTRY POINT  ---------------------------- #
    async def combined_search(self, name: str, city: str, state: str, shop_type: str, website_url: str) -> tuple[bool, str | None, None]:
        print("🌐 Starting combined search…")

        Google_query = f"{name} {city} {state} {shop_type} "

        Yelp_query = f"{name} {city} {state} {shop_type} site: yelp.com "

        Reddit_query = f"{name} {city} {state} {shop_type} site: reddit.com "
        if website_url:
            Official_query = f"{name} site: {website_url} "

        g_res = await self.search_lookup(name, shop_type, Google_query, "Google", 10, False)

        y_res = await self.search_lookup(name, shop_type, Yelp_query, "Yelp", 5, True)

        r_res = await self.search_lookup(name, shop_type, Reddit_query, "Reddit", 5, True)
        w_res = await self.wikipedia_lookup(name, city, shop_type)
        o_res = None
        if website_url:
            o_res = await self.search_lookup(name, shop_type, Official_query, f"Official Website of {name}", 5, True)
        main = ""
        if y_res:
            main += y_res
        if r_res:
            main += r_res
        if g_res:
            main += g_res
        if w_res:
            main += w_res
        if o_res:
            main += o_res
        if len(main) < 500:
            print("❌ Not enough content gathered.")
            return False, None, None

        print("✅ Combined search complete.")
        return True, main, None
# --------------------------------------------------------------------------- #
# 📦  HIGH‑LEVEL CONTENT SUMMARIZER                                          #
# --------------------------------------------------------------------------- #
class ContentSummarizer:
    """
    Reduce a large blob of text about a specific business down to ≤ max_final_chars
    while preserving high‑value facts. Uses multi-stage LLM summarisation with
    chunk filtering and escalation if necessary.
    """
    def __init__(
        self,
        ollama_runner: OllamaRunner,
        shop_name: str,
        shop_type: str,
        city: str | None = None,
        state: str | None = None,
        max_final_chars: int = 6000,
        min_final_chars: int = 500,
    ):
        self.ollama = ollama_runner
        self.shop_name = shop_name
        self.shop_type = shop_type
        self.city = city or ""
        self.state = state or ""
        self.max_final_chars = max_final_chars
        self.min_final_chars = min_final_chars

    # ----------------- Internal Helpers ----------------- #
    def _clean_raw_content(self, content: str) -> str:
        lines = content.splitlines()
        cleaned, seen = [], set()
        NOISE = [
            "cookie policy", "all rights reserved", "subscribe", "advertisement",
            "accept cookies", "privacy policy", "terms of service", "sign in", "cookie"
        ]
        for line in lines:
            line = line.strip()
            lo = line.lower()
            if len(line) < 30 or lo in seen:
                continue
            if any(noise in lo for noise in NOISE):
                continue
            seen.add(lo)
            cleaned.append(line)
        return "\n".join(cleaned)

    def _chunk_text(self, text: str, chunk_size: int) -> list[str]:
        return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]

    def _filter_chunks(self, chunks: list[str], model: str = "gemma3:1b") -> list[str]:
        try:
            good = []
            for chunk in chunks:
                prompt = (
                    f"Is the following content useful for creating a profile for the {self.shop_type} "
                    f"'{self.shop_name}'? Only reply 'true' or 'false'.\n\n{chunk}"
                )
                decision = self.ollama.run(prompt, model=model).strip().lower()
                if "true" in decision:
                    good.append(chunk)

            return good
        except Exception as e:
            print(f"❌ Failed to filter chunk: {e}")
            return None

    def _build_prompt(self, text_chunk: str) -> str:
        return (
            f"You are creating a SHORT high-quality summary for "
            f"{self.shop_type} **{self.shop_name}** "
            f"{'in ' + self.city if self.city else ''}{', ' + self.state if self.state else ''}.\n\n"
            f"Keep ALL USEFUL DATA"
            "KEEP SERVICES.\n"
            f"KEEP URLS ONLY WHEN IT IS THE LITERAL {self.shop_name}'s WEBSITE.\n"
            f"KEEP ALL GOOD INFO AND HISTORY, FACTS, INFO, Extract all usefull info, Example: \n"
            f"• Keep Contact info (phone, email)\n"
            f"• Keep Official website URLs only if directly related to {self.shop_name}\n"
            f"• Keep Operating hours, holiday hours, KEEP ALL USEFUL ANY DATA\n"
            f"• Keep Brands Price range, services, FAQs, key facts, reviews, history\n"
            f"• Keep Reviews sentiment, pros/cons, unique selling points\n"
            f"• Keep Awards, certifications, reviews, press articles\n\n"
            f" remove unrelated info.\n"

            f"--- SOURCE TEXT START ---\n{text_chunk}\n--- SOURCE TEXT END ---"

        )

    def _summarize_with_ollama(self, text: str, model: str) -> str:
        try:

            prompt = self._build_prompt(text)
            return self.ollama.run(prompt, model=model)
        except Exception as e:
            print(f"❌ Failed to summarize w ollama: {e}")
            return None
    def summarize_chunks(
        self,
        content: str,
        chunk_size: int = 2000,
        initial_model: str = "gemma3:1b",

    ) -> str:
        try:
            chunks = self._chunk_text(content, chunk_size)
            content = self._filter_chunks(chunks)
            if not content:
                content = chunks
            filtered_content = ''.join(content)
            if len(filtered_content) < self.min_final_chars:
                return None
            if len(filtered_content) < self.max_final_chars:
                return filtered_content

            summarized_chunks = []

            for idx, chunk in enumerate(chunks, start=1):
                print(f"📚 Summarizing chunk {idx}/{len(chunks)}...")
                summary = self._summarize_with_ollama(chunk, model=initial_model)

                if not summary or len(summary) < 50:
                    print(f"⚠️ Failed to summarize chunk {idx}, keeping raw content.")
                    summary = chunk  # Fallback to raw content if summary failed

                summarized_chunks.append(f"### CHUNK {idx} SUMMARY:\n{summary}")

                # Early exit: check if adding all remaining raw chunks without summarizing fits within limit
                combined_so_far = "\n\n".join(summarized_chunks)
                remaining_raw = "".join(chunks[idx:])  # Remaining chunks after current one

                if len(combined_so_far) + len(remaining_raw) <= self.max_final_chars:
                    print(f"✅ Early exit: current summary + remaining raw fits within limit. Skipping further summarization.")
                    for r_idx, remaining_chunk in enumerate(chunks[idx:], start=idx + 1):
                        summarized_chunks.append(f"### CHUNK {r_idx} (Raw):\n{remaining_chunk}")
                    break

            combined_summary = "\n\n".join(summarized_chunks)
            if len(combined_summary) < self.min_final_chars:
                return None
            # Final trim if absolutely necessary
            if len(combined_summary) > self.max_final_chars:
                print("⚠️ Final combined summary exceeds character limit. Trimming result.")
                return combined_summary[:self.max_final_chars]

            return combined_summary
        except Exception as e:
            print(f"❌ Failed to sumarrize chunks: {e}")
            return None


    def summarize_content(self, raw_content: str) -> str:
        print("🧹 Cleaning raw content...")
        cleaned = self._clean_raw_content(raw_content)
        if len(cleaned) <= self.max_final_chars:
            print("✅ Cleaned content fits within final character limit.")
            return cleaned

        final_summary = self.summarize_chunks(
        content=cleaned,           # The large raw text content you want to reduce
        chunk_size=1500,                # Size of each chunk before summarizing
        initial_model="gemma3:1b",      # Start with the lightweight model

    )
        if final_summary is None:
            print("❌ Summarization failed. Returning cleaned content instead.")
            return cleaned[:self.max_final_chars]

        print("🎉 Final summarization complete.")
        return final_summary

import re
import json

class Smartypants:
    def __init__(self, ollama_runner: OllamaRunner):
        self.ollama = ollama_runner

    def _run4(self, prompt: str) -> str:
        return self.ollama.run(prompt, model="gemma3:4b")
    def _run1(self, prompt: str) -> str:
        return self.ollama.run(prompt, model="gemma3:1b")

    # ------------ PLAN ------------ #
    def create_plan(self, aggregate: str,shop_name: str, shop_type: str,  city: str, state: str) -> tuple[bool, list[str]]:
        prompt = (
            "You have an aggregated summary about a "

            f"place called {shop_name} {shop_type} in {city}, {state}\n\n"
            f"{aggregate}\n\n"
            "Which content sections can confidently be generated based on this?\n"
            "Options: article, faq, history.\n"
            "Reply with a correct python comma-separated list of available sections to write about, nothing else, Options: article, faq, history."
        )
        try:
            count = 0
            resp = self._run4(prompt).lower()
            valid = {"article", "faq", "history"}
            for s in valid:
                if s not in resp:
                    count +=1
            if count == 3:
                return False

            return True, [s for s in valid if s in resp]
        except Exception as e:
            print(f"❌ create_plan error: {e}")
            return False, []

    def check_aggregate_quality(self, shop_name: str, aggregate: str,shop_type: str,  city: str, state: str) -> bool:
        prompt = (
            f"Check if this content contains usefull info about {shop_name} {shop_type} in {city}, {state}.\n\n{aggregate}\n\n"
            "Reply only `true` or `false`."
        )
        try:
            return "true" in self._run1(prompt).lower()
        except Exception as e:
            print(f"❌ quality check error: {e}")
            return False

    # ------------ SECTION VALIDATION / FIX ------------ #
    def validate_section_html(self, shop_name: str, section: str, text: str) -> bool:
        prompt = (
            f"Validate the following text for section '{section}'. It is supposed to be about '{shop_name}'.\n\n{text}\n\n"
            "Rules:\n- No HTML.\n- No irrelevant info. Is it useful and no format or weird characters? Nothing Else, Nothing before or after our content\n"
            "- Only factual, structured, and clear content.\n- Reply `true` or `false` only."
        )
        return "true" in self._run1(prompt).lower()

    def fix_section_html(self, shop_name: str, section: str, text: str) -> str | None:
        prompt = (
            f"Clean and fix this section '{section}'s format. It is about '{shop_name}'.\n\n{text}\n\n"
            "Rules:\n- No HTML, Is it useful and and no format or weird characters? asterisks, or irrelevant info.\n"
            "Return only the final cleaned text for consumers on nearestdoor.com to read, no junk, no explanations, nothing else, nothing before or after our content. Only return the corrected text."
        )
        return self._run4(prompt).strip()

    # ------------ JSON & FIELD EXTRACTION ------------ #
    def extract_clean_json_structure(self, text: str) -> dict | None:
        try:
            match = re.search(r"\{.*?\}", text.strip(), re.DOTALL)
            if not match:
                return None

            match_text = match.group(0).lower()
            exclusion_keywords = ['n/a', 'n-a', 'none', 'false', 'na', 'cant', 'not', 'found', 'unable', '{{', '()', 'unavailable']

            if any(bad in match_text for bad in exclusion_keywords):
                return None

            json_ready = match.group(0).replace("'", '"')
            return json.loads(json_ready)

        except Exception:
            return None


    def extract_available_fields(self, aggregate: str,shop_name: str, shop_type: str,  city: str, state: str) -> tuple[bool, list[str]]:
        try:
            field_list = list(FIELD_EXTRACTORS.keys())
            field_str = ', '.join(field_list)
            prompt = (
                f"Analyze the content:\n\n{aggregate}\n\n"
                f"Whos data we want {shop_type}, {shop_name}. "
                f"Which of these fields can be confidently extracted?\n{field_str}\n"
                "Reply ONLY with the field keys that are present in the text, as ONLY the correct format requested. no junk. Nothing else. "
                "If none, reply exactly 'FALSE'."
            )
            response = self._run4(prompt).strip().lower()
            if "false" in response:
                return True, []
            detected = [field for field in field_list if field.lower() in response]
            return True, detected
        except Exception as e:
            print(f"❌ extract_available_fields error: {e}")
            return False, []
    def validate_extracted_field_value(self, field: str, value) -> bool:
        """
        Validate the extracted field value using LLM and manual schema checks.

        - If it's a JSON list, remove invalid entries.
        - If it's invalid after cleaning, return False.
        """
        # LLM-Based Validation Prompt
        prompt = (
            f"Validate this extracted value for field '{field}':\n\n{value}\n\n"
            "Is this a valid and correct value and format for the specified field requested? Is it weird for the field or contain none values? Reply ONLY `true` or `false`."
        )
        llm_decision = "true" in self._run1(prompt).lower()

        # If LLM says it's invalid, fail immediately
        if not llm_decision:
            print(f"❌ LLM validation failed for field '{field}'.")
            return False



        # Final check for singular values
        return True

    def extract_fields(
        self, aggregate: str, available_fields: list[str],
        shop_name: str, shop_type: str, city: str, state: str
    ) -> tuple[bool, dict]:
        extracted = {}
        try:
            for field in available_fields:
                try:
                    prompt = (
                        f"{FIELD_EXTRACTORS[field]}\n\nContent:\n{aggregate}\n\n"
                        f"Return ONLY the valid structure requested. Respond with nothing but the correct format requested. "
                        f"If none, say 'none'. Nothing else, nothing before or after our content. Data about {shop_type}, {shop_name}."
                    )
                    raw_value = self._run4(prompt).strip()
                    final_value = self.extract_clean_json_structure(raw_value) or raw_value

                    if self.validate_extracted_field_value(field, final_value):

                        extracted[field] = final_value

                except Exception as inner_e:
                    print(f"⚠️ Field extraction failed for '{field}': {inner_e}")
                    continue

            return True, extracted if extracted else {}

        except Exception as e:
            print(f"⚠️ extract_fields failed: {e}")
            return False, {}


    # ------------ SECTION GENERATION ------------ #
    def create_sections(
        self, shop_name: str, shop_type: str, aggregate: str,
        approved_sections: list[str], city: str | None = None, state: str | None = None
    ) -> tuple[bool, dict]:
        def _generate(section: str, prompt: str) -> str | None:
            raw = self._run4(prompt).strip()
            if self.validate_section_html(shop_name, section, raw):
                return raw
            fixed = self.fix_section_html(shop_name, section, raw)
            return fixed if fixed and self.validate_section_html(shop_name, section, fixed) else None

        location = f"in {city}, {state}" if city or state else ""
        base_instr = (
            f"You are writing for nearestdoor.com about our listings, about the {shop_type} '{shop_name}' {location}. "
            "You will get a summary and write useful information according to your assignment which consumers will read on the nearestdoor.com website about this place, no bad info or bad formatting"
            "Be factual, SEO-friendly, help the users learn use this place and learn about it. no unrelated info, no HTML or asterisks. nothing else, nothing before or after our content. DO NOT USE * "
        )

        sections = {}
        try:
            if "article" in approved_sections:
                prompt = f"{base_instr}\n\nContent:\n{aggregate}\n\nAssignment: Write a detailed article. Write an article about {shop_name} for nearestdoor.com."
                result = _generate("article", prompt)
                if result:
                    sections["article"] = result

            if "faq" in approved_sections:
                prompt = f"{base_instr}\n\nContent:\n{aggregate}\n\nAssignment: Write a detailed FAQ. Write an FAQ about {shop_name} for nearestdoor.com."
                result = _generate("faq", prompt)
                if result:
                    sections["faq"] = result

            if "history" in approved_sections:
                prompt = f"{base_instr}\n\nContent:\n{aggregate}\n\nAssignment: Write the history section about {shop_name} for nearestdoor.com."
                result = _generate("history", prompt)
                if result:
                    sections["history"] = result

            return True, sections
        except Exception as e:
            print(f"❌ create_sections error: {e}")
            return False, {}

    # ------------ FULL WORKFLOW ------------ #
    def process(
        self, shop_name: str, shop_type: str, aggregate: str,
        city: str | None = None, state: str | None = None
    ) -> dict:
        result = {"plan": [], "sections": {}, "fields": {}}

        if not self.check_aggregate_quality(shop_name, aggregate, shop_type, city, state):
            print("❌ Aggregate failed quality check.")
            return None

        ok, plan = self.create_plan(aggregate, shop_name, shop_type, city, state)
        if not ok or not plan:
            print("❌ No sections can be created.")
            return None
        result["plan"] = plan

        ok, sections = self.create_sections(shop_name, shop_type, aggregate, city, state)
        if ok:
            result["sections"] = sections

        ok, available = self.extract_available_fields(aggregate, shop_name, shop_type, city, state)
        if ok and available:
            ok, fields = self.extract_fields(aggregate, available,shop_name, shop_type,city, state)
            if ok:
                result["fields"] = fields

        return result

import re
import json

def is_non_empty_string(value) -> bool:
    return isinstance(value, str) and len(value.strip()) > 0

def is_valid_json(value) -> bool:
    try:
        json.loads(value)
        return True
    except Exception:
        return False

def is_valid_phone(value) -> bool:
    return bool(re.fullmatch(r"\d{3}-\d{3}-\d{4}", value.strip()))

def is_valid_email(value) -> bool:
    return bool(re.fullmatch(r"[^@\s]+@[^@\s]+\.[a-zA-Z0-9]+", value.strip()))

def is_valid_url(value) -> bool:
    return isinstance(value, str) and value.strip().startswith("http")

def is_valid_dict(value) -> bool:
    try:
        return isinstance(json.loads(value), dict)
    except Exception:
        return False

def is_valid_list(value) -> bool:
    try:
        return isinstance(json.loads(value), list)
    except Exception:
        return False

def is_positive_integer_or_string(value) -> bool:
    try:
        return int(str(value).strip()) > 0
    except Exception:
        return False

FIELD_VALIDATORS = {
    # Contact Info
    "extract_phone": is_valid_phone,
    "extract_email": is_valid_email,
    "extract_website": is_valid_url,

    # Structured Fields
    "extract_categories": is_valid_list,
    "extract_operating_hours": is_valid_dict,
    "extract_holiday_hours": is_valid_dict,
    "extract_delivery_services": is_valid_list,
    "extract_social_media": is_valid_dict,
    "extract_stocked_brands": is_valid_list,
    "extract_inventory_categories": is_valid_dict,
    "extract_customer_reviews": is_valid_list,

    # Event / Misc
    "extract_admission": is_non_empty_string,
    "extract_date_available": is_non_empty_string,
    "extract_attendance_amount": is_positive_integer_or_string,
    "extract_exhibitor_amount": is_positive_integer_or_string,
}
FIELD_EXTRACTORS = {
    # Contact Information
    "extract_phone": (
        "Extract the phone number in this format: 727-237-2132. "
        "Return ONLY the number, no quotes, no text, no comments, no markup."
    ),
    "extract_email": (
        "Extract the email address. Example: example@mail.com. "
        "Return ONLY the email address, no quotes, no text, no extras."
    ),
    "extract_website": (
        "Extract the official website URL. Example: https://website.com. "
        "Return ONLY the URL, no quotes, no text, no markup."
    ),

    # JSON / Structured Fields
    "extract_categories": (
        "Extract the product/service categories in JSON list format. "
        "Example: ['Thrift Store', 'Charity']. Return ONLY the JSON array."
    ),
    "extract_operating_hours": (
        "Extract weekly operating hours in JSON dictionary format. "
        "Example: {'monday': '9:00 AM - 5:00 PM', 'sunday': 'Closed'}. "
        "Return ONLY the JSON object."
    ),
    "extract_holiday_hours": (
        "Extract holiday-specific hours in JSON dictionary format. "
        "Example: {'2024-12-25': 'Closed', '2024-12-31': '10:00 AM - 4:00 PM'}. "
        "Return ONLY the JSON object."
    ),
    "extract_delivery_services": (
        "Extract available delivery services in JSON list format. "
        "Example: ['Uber Eats', 'Self Delivery']. Return ONLY the JSON array."
    ),
    "extract_social_media": (
        "Extract social media links in JSON dictionary format. "
        "Example: {'facebook': 'https://facebook.com/example', 'instagram': 'https://instagram.com/example'}. "
        "Return ONLY the JSON object."
    ),
    "extract_stocked_brands": (
        "Extract stocked brands in JSON list format. "
        "Example: ['Nike', 'Adidas']. Return ONLY the JSON array."
    ),
    "extract_inventory_categories": (
        "Extract inventory categories in JSON dictionary format. "
        "Example: {'Apparel': ['Shirts', 'Hoodies']}. Return ONLY the JSON object."
    ),
    "extract_customer_reviews": (
        "Extract customer reviews in JSON list format. "
        "Example: [{'user': 'John', 'comment': 'Great store!', 'rating': 5}]. "
        "Return ONLY the JSON array."
    ),

    # Event / Scheduling
    "extract_admission": (
        "Extract the admission cost or entry fee. Return ONLY the plain text, no prefixes or suffixes."
    ),
    "extract_date_available": (
        "Extract the available date range or date description. "
        "Example: 'Available from May 1st to June 30th'. Return ONLY the plain text."
    ),
    "extract_attendance_amount": (
        "Extract expected attendance as a number. Example: 500. Return ONLY the number or numeric string."
    ),
    "extract_exhibitor_amount": (
        "Extract expected number of exhibitors. Example: 12. Return ONLY the number or numeric string."
    ),
}

class NearestDoorClient:
    def __init__(self, crawler_manager, client_id=CLIENT_ID, api_base=API_BASE):
        self.client_id = client_id
        self.api_base = api_base
        self.crawler_mgr = crawler_manager

        self.ollama = OllamaRunner()
        self.lookup_engine = LookupEngine(self.crawler_mgr, self.ollama)

        self.last_heartbeat = 0

        # Inject dependencies

        self.smartypants = Smartypants(self.ollama)

    def _api_get(self, endpoint, params=None):
        try:
            print(f"📡 GET → {endpoint}")
            response = requests.get(f"{self.api_base}{endpoint}", params=params or {}, timeout=30)
            response.raise_for_status()
            return response
        except requests.RequestException as e:
            print(f"❌ GET failed: {e}")
            return None

    def _api_post(self, endpoint, data):
        try:
            print(f"📡 POST → {endpoint}")
            response = requests.post(f"{self.api_base}{endpoint}", json=data, timeout=30)
            response.raise_for_status()
            return response
        except requests.RequestException as e:
            print(f"❌ POST failed: {e}")
            return None

    def get_task(self):
        res = self._api_get("/next-task", params={"client_id": self.client_id})
        if res and res.status_code == 200:
            task = res.json()
            if isinstance(task, dict) and "task_id" in task:
                return task
            print(f"⚠️ Invalid task structure received: {task}")
        return None

    def send_heartbeat(self, current_task_id=None):
        data = {"client_id": self.client_id}
        if current_task_id:
            data["task_id"] = current_task_id
        self._api_post("/heartbeat", data)
        print("🫀 Heartbeat sent.")

    async def handle_task(self, task):
        task_id =task.get("task_id")
        task_type =task.get("task_type")
        if not task_id or not task_type:
            print("❌ Invalid task format.")
            return

        print(f"▶️ Handling task {task_type} (ID: {task_id})")

        result, summary, mainstring, images = False, None, None, None
        aggregateplan, createdinfo, extractedfields, foundfields = None, None, None, None
        print(task)
        name = task['target'].get("name")
        city = task['target'].get("city")
        state = task['target'].get("state")
        website_url = task['target'].get("website", None)

        shop_type = task['target'].get("shop_type")
        aggregate = task['target'].get("aggregate", "")
        plan = task['target'].get("plan", [])
        fields = task['target'].get("fields", [])

        match task_type:
            case "search":
                result, mainstring, images = await self.lookup_engine.combined_search(name, city, state, shop_type, website_url)
                result = str(result)
            case "aggregate":
                summarizer = ContentSummarizer(self.ollama, name, shop_type, city, state)
                summary = summarizer.summarize_content(aggregate)
                result = bool(summary)

            case "createplan":
                result, aggregateplan = self.smartypants.create_plan(aggregate, name, shop_type, city, state)

            case "create":
                result, createdinfo = self.smartypants.create_sections(name, shop_type, aggregate, plan, city, state)

            case "find_available_fields":
                result, foundfields = self.smartypants.extract_available_fields(aggregate, name, shop_type, city, state)

            case "extract_fields_from_aggregate":
                result, extractedfields = self.smartypants.extract_fields(aggregate, fields, name, shop_type, city, state)

            case _:
                print(f"❌ Unknown task type: {task_type}")
                return
        if result:
                print(f"📤 Submitting result for {task_type} ({task_id})")
                try:
                    if task_type == 'search':
                        res = self._api_post(f"/submit/{task_id}", {"status": "success", "mainstring": mainstring, "client_id": CLIENT_ID})

                    if task_type == 'aggregate':
                        if summary:
                            res = self._api_post(f"/submit/{task_id}", {"status": "success", "summary": summary, "client_id": CLIENT_ID})
                        else:
                            print("nosummary")
                            res = self._api_post(f"/submit/{task_id}", {"status": "fail", "client_id": CLIENT_ID})


                    if task_type == 'createplan':
                        res = self._api_post(f"/submit/{task_id}", {"status": "success", "aggregateplan": aggregateplan, "client_id": CLIENT_ID})
                    if task_type == 'create':
                        res = self._api_post(f"/submit/{task_id}", {"status": "success", "createdinfo":createdinfo, "client_id": CLIENT_ID})
                    if task_type == 'find_available_fields':
                        res = self._api_post(f"/submit/{task_id}", {"status": "success", "foundfields": foundfields, "client_id": CLIENT_ID})
                    if task_type == 'extract_fields_from_aggregate':
                        res = self._api_post(f"/submit/{task_id}", {"status": "success", "extractedfields": extractedfields, "client_id": CLIENT_ID})

                    print(f"Server responded: {res.status_code} - {res.text}")
                    if res.status_code == 200:
                        print(f"✅ Submitted: {task_type}")
                    else:
                        print(f"❌ Submit failed: {task_type} - {res.status_code}")
                except Exception as e:
                    print(f"Submit exception: {e}")
        else:

            print(f"Submit Failure {task_type}")
            res = self._api_post(f"/submit/{task_id}", {"status": "fail", "client_id": CLIENT_ID})



    async def run(self):


        try:
            while True:
                task = self.get_task()
                if task:
                    now = time.time()
                    if now - self.last_heartbeat > HEARTBEAT_INTERVAL:
                        self.send_heartbeat(task.get("task_id"))
                        self.last_heartbeat = now
                    await self.handle_task(task)
                else:
                    print("⏳ No task available, sleeping...")
                    await asyncio.sleep(10)
        finally:
            await self.crawler_mgr.stop()
if __name__ == "__main__":
    import sys

    # Ensure compatibility with environments like Jupyter or IPython
    nest_asyncio.apply()

    # Create the crawler manager
    crawler_manager = LookupEngine.CrawlerManager()
    await crawler_manager.start()
    # Create the client
    client = NearestDoorClient(crawler_manager)

    # Run the client loop
    try:
        asyncio.run(client.run())
    except KeyboardInterrupt:
        print("\n🛑 Shutting down gracefully...")
        sys.exit(0)

✅ Environment Fully Ready!


📡 GET → /next-task
📡 POST → /heartbeat
🫀 Heartbeat sent.
▶️ Handling task search (ID: 30658)
{'task_id': 30658, 'task_type': 'search', 'object_type': 'shop', 'data': {}, 'target': {'id': 31337, 'name': 'Eldridge dog park', 'city': 'Elmira', 'state': 'New York', 'website': None, 'slug': 'eldridge-dog-park', 'shop_type': 'dog-park'}}
🌐 Starting combined search…
🔎 Google search → Eldridge dog park Elmira New York dog-park 
error in basicc checker {'https://www.bringfido.com/attraction/13506'} {'Eldridge dog park'} {'dog-park'} Crawler not started
error in basicc checker {'https://www.cityofelmira.net/208/off-lease-area-for-dogs'} {'Eldridge dog park'} {'dog-park'} Crawler not started
error in basicc checker {'https://www.mapquest.com/us/new-york/dog-park-at-eldrige-park-381009049'} {'Eldridge dog park'} {'dog-park'} Crawler not started
error in basicc checker {'https://www.cityofelmira.net/facilities/facility/details/eldridgepark-8'} {'Eldridge dog park'} {'dog-park'} Crawler not started
