In [9]:
import os
import json
import re
import time
from collections import defaultdict
from urllib.parse import urljoin, urlparse

import requests
from bs4 import BeautifulSoup
from fuzzywuzzy import fuzz

"""
A Jupiter Community crawler that **updates the output JSON file after every
new page is processed**, so you can open the file at any moment while the
script is running and verify progress in real‑time.
"""

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/91.0.4472.124 Safari/537.36"
    )
}

MAX_PAGES: int = 100               # prevent infinite loops
REQUEST_DELAY: float = 1.0         # polite crawl delay (seconds)
DATA_FILENAME: str = "jupiter_faq_data_realtime.json"  # live‑updating JSON

# ---------------------------------------------------------------------------
# Helpers – text normalisation, similarity, deduplication
# ---------------------------------------------------------------------------

def clean_text(text: str) -> str | None:
    """Remove extra whitespace/HTML and return a clean string (or None)."""
    text = re.sub(r"\s+", " ", text)
    text = re.sub(r"<[^>]+>", "", text)
    text = text.strip()
    return text or None


def normalize_text(text: str) -> str:
    text = text.lower().strip()
    text = re.sub(r"[^\w\s]", "", text)
    return text


def are_texts_similar(t1: str, t2: str, threshold: int = 85) -> bool:
    return fuzz.token_sort_ratio(normalize_text(t1), normalize_text(t2)) > threshold

# ---------------------------------------------------------------------------
# Categorisation & extraction
# ---------------------------------------------------------------------------

def categorize_page(url: str) -> str:
    """Roughly classify a page by its URL path."""
    url_path = urlparse(url).path.lower()
    if any(k in url_path for k in ("help", "faq")):
        return "Help/FAQ"
    if any(k in url_path for k in ("feature", "product")):
        return "Features"
    if any(k in url_path for k in ("about", "team", "company")):
        return "About"
    if "blog" in url_path:
        return "Blog"
    if url_path in {"/", "", "/home"}:
        return "Home"
    return "General"


def extract_page_data(url: str, soup: BeautifulSoup) -> dict | None:
    """Return structured data (url, title, content[]) or None if empty."""
    data: dict = {
        "url": url,
        "title": clean_text(soup.title.get_text() if soup.title else ""),
        "content": [],
    }

    selectors = [
        ("p", {}),
        ("h1", {}),
        ("h2", {}),
        ("h3", {}),
        ("h4", {}),
        ("li", {}),
        (
            "div",
            {
                "class_": re.compile(
                    r"content|text|description|section|article", re.I
                )
            },
        ),
    ]

    seen_texts: set[str] = set()
    for tag, kwargs in selectors:
        for elem in soup.find_all(tag, **kwargs):
            text = clean_text(elem.get_text())
            if not text or len(text) <= 20:
                continue  # ignore trivial strings
            if any(are_texts_similar(text, s) for s in seen_texts):
                continue  # skip near‑duplicates
            seen_texts.add(text)
            data["content"].append(text)

    return data if data["content"] else None

# ---------------------------------------------------------------------------
# JSON utilities – keep the output file always current
# ---------------------------------------------------------------------------

def _atomic_write_json(data: dict, filename: str) -> None:
    """Write JSON atomically so partially‑written files are avoided."""
    tmp = f"{filename}.tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
    os.replace(tmp, filename)  # atomic operation on most OSes


def save_to_json(data_store: defaultdict, filename: str = DATA_FILENAME) -> None:
    """Persist *entire* datastore to disk – called after every new page."""
    _atomic_write_json(data_store, filename)
    print(f"[💾] JSON updated → {filename} (categories: {len(data_store)})")

# ---------------------------------------------------------------------------
# Core crawler – DFS with polite delay + real‑time JSON flushes
# ---------------------------------------------------------------------------
visited_urls: set[str] = set()


def crawl_page(
    url: str,
    base_url: str,
    data_store: defaultdict,
    filename: str = DATA_FILENAME,
) -> None:
    if len(visited_urls) >= MAX_PAGES or url in visited_urls:
        return
    if not url.startswith(base_url):  # stay on‑site
        return

    visited_urls.add(url)
    print(f"[→] Crawling: {url}")

    try:
        response = requests.get(url, headers=HEADERS, timeout=10)
        response.raise_for_status()
    except requests.RequestException as exc:
        print(f"[✗] Error fetching {url}: {exc}")
        return

    soup = BeautifulSoup(response.content, "html.parser")

    # Extract + categorise
    page_data = extract_page_data(url, soup)
    if page_data:
        category = categorize_page(url)
        data_store[category].append(page_data)
        save_to_json(data_store, filename)  # 🔑 real‑time update here!

    # Follow links politely
    for link in soup.find_all("a", href=True):
        href = link["href"]
        full_url = urljoin(base_url, href)
        if urlparse(full_url).netloc != urlparse(base_url).netloc:
            continue  # external link
        time.sleep(REQUEST_DELAY)
        crawl_page(full_url, base_url, data_store, filename)


# ---------------------------------------------------------------------------
# Public API – crawl an entire site
# ---------------------------------------------------------------------------

def crawl_jupiter_website(base_url: str, filename: str = DATA_FILENAME) -> dict:
    """Run the crawler; returns the full data structure (also on disk)."""
    data_store: defaultdict = defaultdict(list)
    save_to_json(data_store, filename)  # ensure file exists from the start
    crawl_page(base_url, base_url, data_store, filename)
    return data_store


# ---------------------------------------------------------------------------
# Entry‑point (CLI)
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    BASE_URL = "https://community.jupiter.money/c/help/27"
    crawl_jupiter_website(BASE_URL)
    print("✅ Crawl finished.")

[💾] JSON updated → jupiter_faq_data_realtime.json (categories: 0)
[→] Crawling: https://community.jupiter.money/c/help/27
[💾] JSON updated → jupiter_faq_data_realtime.json (categories: 1)
[→] Crawling: https://community.jupiter.money/c/help/27?page=1
[💾] JSON updated → jupiter_faq_data_realtime.json (categories: 1)
[→] Crawling: https://community.jupiter.money/c/help/27.json
[→] Crawling: https://community.jupiter.money/c/help/27?page=2
[💾] JSON updated → jupiter_faq_data_realtime.json (categories: 1)
[→] Crawling: https://community.jupiter.money/c/help/27?page=3
[💾] JSON updated → jupiter_faq_data_realtime.json (categories: 1)


KeyboardInterrupt: 

# only Q & A extraction

In [10]:
import os
import re
import json
import time
from collections import defaultdict
from urllib.parse import urljoin, urlparse

import requests

"""
Jupiter **Help / FAQ** crawler
-----------------------------
• Grabs every topic (question) from a Discourse category page
• Uses the official JSON endpoints (`/c/.../.json` + `/t/.../.json`) to
  avoid brittle HTML scraping
• Writes a *live‑updating* `jupiter_faq_qa_realtime.json` containing:

    {
      "help/debit-cards": [
        {
          "question": "Custom metal debit card",
          "answers": ["…accepted/staff reply…"],
          "url": "https://community.jupiter.money/t/custom-metal-debit-card/53951"
        },
        …
      ],
      "help/payments": [ … ]
    }

Stop the script at any time; the JSON on disk is already complete up to
that point.
"""

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/114.0.0.0 Safari/537.36"
    ),
    "Accept": "application/json, text/html",  # Discourse is happy with either
}

CATEGORY_URL = "https://community.jupiter.money/c/help/27"  # 💡 change me
DATA_FILENAME = "jupiter_faq_qa_realtime.json"
REQUEST_DELAY = 0.6  # polite delay between topic requests (seconds)

# ---------------------------------------------------------------------------
# JSON helpers – atomic write so the file is *always* valid
# ---------------------------------------------------------------------------

def _atomic_write_json(data: dict, filename: str) -> None:
    tmp = f"{filename}.tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    os.replace(tmp, filename)


def _flush(data_store: defaultdict, filename: str = DATA_FILENAME) -> None:
    _atomic_write_json(data_store, filename)
    total = sum(len(v) for v in data_store.values())
    print(f"[💾] {total} QA pairs saved → {filename}")

# ---------------------------------------------------------------------------
# Discourse utilities
# ---------------------------------------------------------------------------

def _strip_html(html: str) -> str:
    """Remove tags + collapse whitespace to a single space."""
    text = re.sub(r"<[^>]*>", "", html)
    return re.sub(r"\s+", " ", text).strip()


# ---------- TOPIC LIST ---------- #

def get_topic_urls(category_url: str) -> list[str]:
    """Return absolute URLs for every topic in the Discourse category."""
    api = category_url.rstrip("/") + ".json"
    res = requests.get(api, headers=HEADERS, timeout=10)
    res.raise_for_status()
    j = res.json()

    root = f"{urlparse(category_url).scheme}://{urlparse(category_url).netloc}"
    urls = [f"{root}/t/{t['slug']}/{t['id']}" for t in j["topic_list"]["topics"]]
    print(f"[→] Found {len(urls)} topics in {category_url}")
    return urls


# ---------- SINGLE TOPIC ---------- #

def _pick_answers(posts: list[dict], accepted_id: int | None) -> list[str]:
    """Return list of answer strings based on accepted/staff/fallback rules."""
    # 1️⃣ Accepted‑answer plugin
    if accepted_id:
        for p in posts:
            if p["id"] == accepted_id:
                return [_strip_html(p["cooked"])]

    # 2️⃣ First staff/moderator reply
    for p in posts[1:]:  # skip original post
        if p.get("staff") or p.get("moderator"):
            return [_strip_html(p["cooked"])]

    # 3️⃣ Fallback – the very first reply
    if len(posts) > 1:
        return [_strip_html(posts[1]["cooked"])]

    return []


def fetch_topic(topic_url: str) -> dict:
    """Return a QA dict for one topic."""
    api = topic_url + ".json"
    j = requests.get(api, headers=HEADERS, timeout=10).json()

    posts = j["post_stream"]["posts"]
    accepted_id = j.get("accepted_answer", {}).get("post_id") or j.get("accepted_answer", {}).get("id")
    answers = _pick_answers(posts, accepted_id)

    return {
        "category": j.get("category_slug", "help"),
        "subcategory": j.get("subcategory_slug") or "general",
        "question": j["title"],
        "answers": answers,
        "url": topic_url,
    }

# ---------------------------------------------------------------------------
# Main crawl function
# ---------------------------------------------------------------------------

def crawl_category(category_url: str = CATEGORY_URL) -> dict:
    data_store: defaultdict = defaultdict(list)
    _flush(data_store)  # create/clear file at once

    for url in get_topic_urls(category_url):
        try:
            qa = fetch_topic(url)
            key = f"{qa['category']}/{qa['subcategory']}".rstrip("/")
            data_store[key].append({k: qa[k] for k in ("question", "answers", "url")})
            _flush(data_store)
            time.sleep(REQUEST_DELAY)
        except requests.RequestException as exc:
            print(f"[✗] {url}: {exc}")

    return data_store


# ---------------------------------------------------------------------------
# CLI entry‑point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    crawl_category()
    print("✅ Crawl finished.")


[💾] 0 QA pairs saved → jupiter_faq_qa_realtime.json
[→] Found 30 topics in https://community.jupiter.money/c/help/27
[💾] 1 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 2 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 3 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 4 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 5 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 6 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 7 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 8 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 9 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 10 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 11 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 12 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 13 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 14 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 15 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 16 QA pairs saved → jupiter_faq_qa_realtime.json
[💾] 17 QA pairs saved → jupiter_faq_qa_realt

# Q & A extraction with meta data of all text 

In [12]:
import os
import re
import json
import time
from collections import defaultdict
from urllib.parse import urljoin, urlparse
import requests

HEADERS = {"User-Agent": "Mozilla/5.0", "Accept": "application/json, text/html"}
CATEGORY_URL = "https://community.jupiter.money/c/help/27"
DATA_FILENAME = "jupiter_faq_full_2.json"
REQUEST_DELAY = 0.5

def _atomic_write(data, fn):
    tmp = fn + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    try:
        os.replace(tmp, fn)
        total = sum(len(v) for v in data.values())
        print(f"[💾] Saved {total} QAs to {fn}")
    except PermissionError:
        print("❌ File is open elsewhere. Skipping atomic update.")

def _strip_html(html):
    text = re.sub(r"<[^>]+>", "", html)
    return re.sub(r"\s+", " ", text).strip()

def get_all_topics(cat_url):
    page = 0; all_topics = []
    base = f"{urlparse(cat_url).scheme}://{urlparse(cat_url).netloc}"
    while True:
        r = requests.get(f"{cat_url}.json?page={page}", headers=HEADERS)
        r.raise_for_status()
        j = r.json()
        topics = j["topic_list"]["topics"]
        if not topics: break
        for t in topics:
            all_topics.append({
                "id": t["id"],
                "slug": t["slug"],
                "title": t["title"],
                "tags": t["tags"],
                "posts_count": t["posts_count"],
                "views": t["views"],
                "created_at": t["created_at"],
                "last_posted_at": t["last_posted_at"],
                "category": j.get("category", {}).get("slug") or j.get("category_slug"),
                "subcategory": j.get("subcategory_slug")
            })
        print(f"[→] Page {page}: {len(topics)} topics")
        page += 1
        time.sleep(REQUEST_DELAY)
    return all_topics

def fetch_topic_detail(topic):
    url = f"https://community.jupiter.money/t/{topic['slug']}/{topic['id']}.json"
    j = requests.get(url, headers=HEADERS).json()
    posts = j["post_stream"]["posts"]
    answers = [_strip_html(p["cooked"]) for p in posts[1:]]  # all except OP
    return answers

def crawl():
    data = defaultdict(list)
    all_topics = get_all_topics(CATEGORY_URL)
    for t in all_topics:
        answers = fetch_topic_detail(t)
        key = f"{t['category']}/{t.get('subcategory') or 'general'}"
        entry = {
            "url": f"https://community.jupiter.money/t/{t['slug']}/{t['id']}",
            "question": t["title"],
            "answers": answers,
            "tags": t["tags"],
            "posts_count": t["posts_count"],
            "views": t["views"],
            "created_at": t["created_at"],
            "last_posted_at": t["last_posted_at"],
        }
        data[key].append(entry)
        _atomic_write(data, DATA_FILENAME)
        time.sleep(REQUEST_DELAY)
    print("✅ Done crawling")

if __name__ == "__main__":
    crawl()


[→] Page 0: 30 topics
[→] Page 1: 30 topics
[→] Page 2: 30 topics
[→] Page 3: 30 topics
[→] Page 4: 30 topics
[→] Page 5: 30 topics
[→] Page 6: 30 topics
[→] Page 7: 30 topics
[→] Page 8: 30 topics
[→] Page 9: 30 topics
[→] Page 10: 30 topics
[→] Page 11: 30 topics
[→] Page 12: 30 topics
[→] Page 13: 30 topics
[→] Page 14: 30 topics
[→] Page 15: 30 topics
[→] Page 16: 30 topics
[→] Page 17: 30 topics
[→] Page 18: 14 topics
[💾] Saved 1 QAs to jupiter_faq_full_2.json
[💾] Saved 2 QAs to jupiter_faq_full_2.json
[💾] Saved 3 QAs to jupiter_faq_full_2.json
[💾] Saved 4 QAs to jupiter_faq_full_2.json
[💾] Saved 5 QAs to jupiter_faq_full_2.json
[💾] Saved 6 QAs to jupiter_faq_full_2.json
[💾] Saved 7 QAs to jupiter_faq_full_2.json
[💾] Saved 8 QAs to jupiter_faq_full_2.json
[💾] Saved 9 QAs to jupiter_faq_full_2.json
[💾] Saved 10 QAs to jupiter_faq_full_2.json
[💾] Saved 11 QAs to jupiter_faq_full_2.json
[💾] Saved 12 QAs to jupiter_faq_full_2.json
[💾] Saved 13 QAs to jupiter_faq_full_2.json
[💾] Saved 

In [19]:
import requests
from bs4 import BeautifulSoup
import json
import time
import os
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Set
import logging

class FAQCrawler:
    def __init__(self, base_url: str, output_file: str = "faqs.json"):
        """
        Initialize the FAQ crawler
        
        Args:
            base_url: The base URL of the website to crawl
            output_file: JSON file to save FAQs
        """
        self.base_url = base_url
        self.output_file = output_file
        self.visited_urls: Set[str] = set()
        self.faqs: List[Dict] = []
        self.session = requests.Session()
        
        # Set up headers to mimic a real browser
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        })
        
        # Set up logging
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        self.logger = logging.getLogger(__name__)
        
        # Load existing FAQs if file exists
        self.load_existing_faqs()
    
    def load_existing_faqs(self):
        """Load existing FAQs from JSON file if it exists"""
        if os.path.exists(self.output_file):
            try:
                with open(self.output_file, 'r', encoding='utf-8') as f:
                    self.faqs = json.load(f)
                self.logger.info(f"Loaded {len(self.faqs)} existing FAQs from {self.output_file}")
            except Exception as e:
                self.logger.error(f"Error loading existing FAQs: {e}")
    
    def save_faqs(self):
        """Save FAQs to JSON file in real-time"""
        try:
            with open(self.output_file, 'w', encoding='utf-8') as f:
                json.dump(self.faqs, f, indent=2, ensure_ascii=False)
            self.logger.info(f"Saved {len(self.faqs)} FAQs to {self.output_file}")
        except Exception as e:
            self.logger.error(f"Error saving FAQs: {e}")
    
    def get_page_content(self, url: str) -> BeautifulSoup:
        """
        Fetch and parse page content
        
        Args:
            url: URL to fetch
            
        Returns:
            BeautifulSoup object or None if failed
        """
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            return BeautifulSoup(response.content, 'html.parser')
        except Exception as e:
            self.logger.error(f"Error fetching {url}: {e}")
            return None
    
    def extract_faqs_from_page(self, soup: BeautifulSoup, url: str) -> List[Dict]:
        """
        Extract FAQ question-answer pairs from a page
        
        Args:
            soup: BeautifulSoup object of the page
            url: URL of the page
            
        Returns:
            List of FAQ dictionaries
        """
        faqs = []
        
        # Look for FAQ containers - multiple possible selectors
        faq_containers = soup.find_all(['div'], class_=lambda x: x and 'faq' in x.lower()) or \
                        soup.find_all(['div'], attrs={'data-controller': 'faq-toggle'}) or \
                        soup.find_all(['div'], class_='faq-item')
        
        if not faq_containers:
            # Try to find FAQ items directly
            faq_items = soup.find_all(['div'], class_='faq-item')
        else:
            # Look for FAQ items within containers
            faq_items = []
            for container in faq_containers:
                faq_items.extend(container.find_all(['div'], class_='faq-item'))
        
        # If still no FAQ items found, try alternative selectors
        if not faq_items:
            faq_items = soup.find_all(['div', 'article', 'section'], class_=lambda x: x and any(
                keyword in x.lower() for keyword in ['faq', 'question', 'accordion', 'toggle']
            ))
        
        for item in faq_items:
            try:
                # Extract question
                question_elem = item.find(['span', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div'], 
                                        class_=lambda x: x and any(
                                            keyword in x.lower() for keyword in ['header', 'question', 'title']
                                        ))
                
                if not question_elem:
                    # Try to find question by looking for clickable elements
                    question_elem = item.find(['span', 'div'], attrs={'data-action': lambda x: x and 'toggle' in x})
                
                if not question_elem:
                    # Try to find the first span or heading
                    question_elem = item.find(['span', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
                
                # Extract answer
                answer_elem = item.find(['div', 'p'], class_=lambda x: x and 'answer' in x.lower())
                
                if not answer_elem:
                    # Try to find answer by data attribute
                    answer_elem = item.find(['div'], attrs={'data-faq-toggle-target': 'answer'})
                
                if not answer_elem:
                    # Try to find the last paragraph or div with text
                    answer_elem = item.find(['p', 'div'], string=lambda text: text and len(text.strip()) > 20)
                
                if question_elem and answer_elem:
                    question = question_elem.get_text(strip=True)
                    answer = answer_elem.get_text(strip=True)
                    
                    if question and answer and len(question) > 5 and len(answer) > 10:
                        faq_data = {
                            'question': question,
                            'answer': answer,
                            'source_url': url,
                            'extracted_at': time.strftime('%Y-%m-%d %H:%M:%S')
                        }
                        faqs.append(faq_data)
                        self.logger.info(f"Extracted FAQ: {question[:50]}...")
                        
            except Exception as e:
                self.logger.error(f"Error extracting FAQ from item: {e}")
                continue
        
        return faqs
    
    def find_faq_pages(self, start_url: str, max_pages: int = 30) -> List[str]:
        """
        Find all FAQ pages on the website
        
        Args:
            start_url: Starting URL to search from
            max_pages: Maximum number of pages to crawl
            
        Returns:
            List of FAQ page URLs
        """
        faq_pages = []
        to_visit = [start_url]
        visited = set()
        
        while to_visit and len(faq_pages) < max_pages:
            url = to_visit.pop(0)
            if url in visited:
                continue
                
            visited.add(url)
            self.logger.info(f"Checking page: {url}")
            
            soup = self.get_page_content(url)
            if not soup:
                continue
            
            # Check if this page contains FAQs
            if self.has_faqs(soup):
                faq_pages.append(url)
                self.logger.info(f"Found FAQ page: {url}")
            
            # Find links to other pages that might contain FAQs
            links = soup.find_all('a', href=True)
            for link in links:
                href = link['href']
                full_url = urljoin(url, href)
                
                # Only follow internal links
                if urlparse(full_url).netloc == urlparse(self.base_url).netloc:
                    # Look for FAQ-related keywords in URL or link text
                    link_text = link.get_text(strip=True).lower()
                    if (any(keyword in full_url.lower() for keyword in ['faq', 'help', 'support', 'question']) or
                        any(keyword in link_text for keyword in ['faq', 'help', 'support', 'question'])):
                        if full_url not in visited and full_url not in to_visit:
                            to_visit.append(full_url)
        
        return faq_pages
    
    def has_faqs(self, soup: BeautifulSoup) -> bool:
        """
        Check if a page contains FAQs
        
        Args:
            soup: BeautifulSoup object of the page
            
        Returns:
            True if page contains FAQs
        """
        # Look for FAQ indicators
        faq_indicators = [
            soup.find_all(string=lambda text: text and 'frequently asked' in text.lower()),
            soup.find_all(['div'], class_=lambda x: x and 'faq' in x.lower()),
            soup.find_all(['div'], attrs={'data-controller': 'faq-toggle'}),
            soup.find_all(['div'], class_='faq-item'),
            soup.find_all(['h1', 'h2', 'h3'], string=lambda text: text and 'faq' in text.lower())
        ]
        
        return any(indicators for indicators in faq_indicators)
    
    def crawl_faqs(self, start_url: str = None, max_pages: int = 30):
        """
        Main method to crawl FAQs from the website
        
        Args:
            start_url: Starting URL (defaults to base_url)
            max_pages: Maximum number of pages to crawl
        """
        if not start_url:
            start_url = self.base_url
        
        self.logger.info(f"Starting FAQ crawl from: {start_url}")
        
        # Find all FAQ pages
        faq_pages = self.find_faq_pages(start_url, max_pages)
        
        if not faq_pages:
            self.logger.warning("No FAQ pages found!")
            return
        
        self.logger.info(f"Found {len(faq_pages)} FAQ pages to crawl")
        
        # Extract FAQs from each page
        total_new_faqs = 0
        for page_url in faq_pages:
            if page_url in self.visited_urls:
                continue
                
            self.logger.info(f"Crawling FAQs from: {page_url}")
            soup = self.get_page_content(page_url)
            
            if soup:
                page_faqs = self.extract_faqs_from_page(soup, page_url)
                
                # Add new FAQs to the list
                for faq in page_faqs:
                    # Check for duplicates based on question
                    if not any(existing['question'] == faq['question'] for existing in self.faqs):
                        self.faqs.append(faq)
                        total_new_faqs += 1
                
                self.visited_urls.add(page_url)
                
                # Save FAQs in real-time
                self.save_faqs()
                
                # Small delay to be respectful
                time.sleep(1)
        
        self.logger.info(f"Crawling completed! Total FAQs: {len(self.faqs)}, New FAQs: {total_new_faqs}")

def main():
    # Example usage
    base_url = "https://jupiter.money/"  # Replace with your target website
    
    # Initialize crawler
    crawler = FAQCrawler(base_url)
    
    # Start crawling
    crawler.crawl_faqs()
    
    print(f"Crawling completed! Check {crawler.output_file} for results.")

if __name__ == "__main__":
    main()

NoSuchDriverException: Message: Unable to obtain driver for chrome; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors/driver_location


In [14]:
import json
import os
import asyncio
from bs4 import BeautifulSoup
import requests
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.embeddings import GoogleGenerativeAIEmbeddings
from langchain.schema import Document
from langchain.vectorstores import Chroma
from langgraph.graph import StateGraph, END
from typing import Dict, Any
from datetime import datetime

# Set up Gemini API key (replace with your actual key)
os.environ["GOOGLE_API_KEY"] = "your-gemini-api-key"

# JSON file for real-time FAQ storage
FAQ_JSON_PATH = "jupiter_faq.json"

# Scrape FAQs from Jupiter's main Help Centre page (toggle sections only)
def scrape_faq_main_page():
    url = "https://jupiter.money/help/"  # Replace with actual Jupiter Help Centre URL
    headers = {"User-Agent": "Mozilla/5.0"}
    response = requests.get(url, headers=headers)
    soup = BeautifulSoup(response.content, "html.parser")
    
    faq_data = []
    # Target toggle sections (e.g., accordion or FAQ toggles, adjust selector as needed)
    toggle_sections = soup.select("div.accordion, details, .faq-toggle")  # Adjust CSS selector for Jupiter's site
    for section in toggle_sections:
        question = section.find("h3, summary, .question")  # Adjust for question element
        answer = section.find("div, p, .answer")  # Adjust for answer element
        if question and answer:
            question_text = question.get_text(strip=True).lower()
            answer_text = answer.get_text(strip=True)
            faq_data.append({
                "question": question_text,
                "answers": [answer_text],
                "tags": [],  # Add logic for tags if available
                "url": url,
                "created_at": datetime.utcnow().isoformat() + "Z"
            })
    
    # Update JSON file in real-time
    with open(FAQ_JSON_PATH, "w") as f:
        json.dump(faq_data, f, indent=2)
    return faq_data

# Initialize Gemini LLM and embeddings
llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0.7)
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")

# Clean and preprocess FAQ data
def preprocess_faq(data):
    documents = []
    for item in data:
        question = item["question"].strip()
        answers = [ans.strip() for ans in item["answers"]]
        answer_text = " ".join(answers)
        tags = item.get("tags", [])
        metadata = {"tags": tags, "url": item["url"]}
        doc = Document(page_content=f"Question: {question}\nAnswer: {answer_text}", metadata=metadata)
        documents.append(doc)
    return documents

# Store FAQs in ChromaDB
def store_in_chroma(documents):
    vectorstore = Chroma.from_documents(documents, embeddings, collection_name="jupiter_faq")
    return vectorstore

# Create RAG chain
def create_rag_chain(vectorstore):
    prompt_template = PromptTemplate(
        input_variables=["context", "question"],
        template="""
        You are a friendly FAQ bot for Jupiter's Help Centre. Using the provided context, answer the user's question in a conversational tone. If no relevant answer is found, say so politely and offer further assistance.

        Context: {context}
        User Question: {question}

        Answer:
        """
    )
    rag_chain = LLMChain(llm=llm, prompt=prompt_template)
    return rag_chain

# LangGraph state and workflow
class BotState(Dict[str, Any]):
    question: str
    context: str
    answer: str
    related_questions: list

def retrieve_context(state: BotState, vectorstore):
    docs = vectorstore.similarity_search(state["question"], k=3)
    state["context"] = "\n".join([doc.page_content for doc in docs])
    return state

def generate_answer(state: BotState, rag_chain):
    response = rag_chain.run(context=state["context"], question=state["question"])
    state["answer"] = response
    return state

def suggest_related(state: BotState, vectorstore):
    related_docs = vectorstore.similarity_search(state["question"], k=5)[1:]  # Skip top match
    state["related_questions"] = [doc.page_content.split("\n")[0].replace("Question: ", "") for doc in related_docs]
    return state

def create_workflow(vectorstore, rag_chain):
    workflow = StateGraph(BotState)
    workflow.add_node("retrieve_context", lambda state: retrieve_context(state, vectorstore))
    workflow.add_node("generate_answer", lambda state: generate_answer(state, rag_chain))
    workflow.add_node("suggest_related", lambda state: suggest_related(state, vectorstore))
    
    workflow.set_entry_point("retrieve_context")
    workflow.add_edge("retrieve_context", "generate_answer")
    workflow.add_edge("generate_answer", "suggest_related")
    workflow.add_edge("suggest_related", END)
    
    return workflow.compile()

# Main interaction loop
async def run_bot():
    # Scrape FAQs and update JSON
    faq_data = scrape_faq_main_page()
    documents = preprocess_faq(faq_data)
    vectorstore = store_in_chroma(documents)
    rag_chain = create_rag_chain(vectorstore)
    app = create_workflow(vectorstore, rag_chain)
    
    print("Jupiter FAQ Bot is ready! Type 'exit' to quit.")
    while True:
        question = input("Your question: ").strip()
        if question.lower() == "exit":
            break
        
        inputs = BotState(question=question, context="", answer="", related_questions=[])
        result = await app.ainvoke(inputs)
        
        print("\nAnswer:", result["answer"])
        print("\nRelated Questions:")
        for q in result["related_questions"]:
            print(f"- {q}")

if __name__ == "__main__":
    asyncio.run(run_bot())

Crawling: https://jupiter.money/
Crawling: https://jupiter.money/contact-us/
Crawling: https://jupiter.money
Crawling: https://jupiter.money/edge-plus-upi-rupay-credit-card/
Crawling: https://jupiter.money/edge-csb-rupay-credit-card/
Crawling: https://jupiter.money/edge-visa-credit-card/
Crawling: https://jupiter.money/savings-account
Crawling: https://jupiter.money/pro-salary-account/
Crawling: https://jupiter.money/corporate-salary-account
Crawling: https://jupiter.money/pots
Crawling: https://jupiter.money/payments
Crawling: https://jupiter.money/bills-recharges
Crawling: https://jupiter.money/pay-via-upi
Crawling: https://jupiter.money/magic-spends
Crawling: https://jupiter.money/money
Crawling: https://jupiter.money/loan
Crawling: https://jupiter.money/loan-against-mutual-funds
Crawling: https://jupiter.money/investments
Crawling: https://jupiter.money/mutual-funds
Crawling: https://jupiter.money/digi-gold
Crawling: https://jupiter.money/flexi-fd
Crawling: https://jupiter.money/re

KeyboardInterrupt: 