In [26]:
import os
import logging
from dotenv import load_dotenv
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import wikipedia
from ddgs import DDGS


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

load_dotenv()


API_KEY = os.getenv("GOOGLE_API_KEY")
CX = os.getenv("GOOGLE_SEARCH_ENGINE_ID")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

service = None
if API_KEY and CX:
    try:
        service = build("customsearch", "v1", developerKey=API_KEY)
        logging.info("Google Search API initialized successfully.")
    except Exception as e:
        logging.warning(f"Failed to initialize Google Search API: {e}")
else:
    logging.warning("Google Search API credentials not found. Skipping Google Search.")


def google_search(query: str, max_results: int = 5):
    if not service:
        raise RuntimeError("Google Search service not initialized.")
    try:
        res = service.cse().list(q=query, cx=CX).execute()
        return [
            {
                "title": item.get("title", ""),
                "link": item.get("link", ""),
                "snippet": item.get("snippet", "")
            }
            for item in res.get("items", [])[:max_results]
        ]
    except HttpError as e:
        logging.error(f"Google API error: {e}")
        raise
    except Exception as e:
        logging.error(f"Unexpected Google search error: {e}")
        raise


def duckduckgo_search(query: str, max_results: int = 5):
    """Perform DuckDuckGo search with error handling."""
    try:
        results = DDGS().text(query, max_results=max_results)
        return [
            {
                "title": item.get("title", ""),
                "link": item.get("href", ""),
                "snippet": item.get("body", "")
            }
            for item in results
        ]
    except Exception as e:
        logging.error(f"DuckDuckGo search error: {e}")
        raise
        
def wikipedia_search(query: str, max_results: int = 5):
    """Perform Wikipedia search with error handling."""
    try:
        titles = wikipedia.search(query, results=max_results)
        results = []
        for title in titles:
            try:
                page = wikipedia.page(title, auto_suggest=False)
                results.append({
                    "title": page.title,
                    "link": page.url,
                    "snippet": page.summary[:300] + "..."  # limit snippet length
                })
            except Exception as e:
                logging.warning(f"Skipping problematic Wikipedia page '{title}': {e}")
        return results
    except Exception as e:
        logging.error(f"Wikipedia search error: {e}")
        raise

def search(query: str):
    try:
        logging.info(f"Trying Google search for: {query}")
        return google_search(query)
    except Exception:
        logging.info(f"Falling back to DuckDuckGo search for: {query}")
        try:
            return duckduckgo_search(query)
        except Exception:
            logging.info(f"Falling back to Wikipedia search for: {query}")
            try:
                return wikipedia_search(query)
            except Exception:
                logging.critical("All search engines failed.")
                raise RuntimeError("Search failed with all providers.")

2025-08-17 17:08:56,743 [INFO] file_cache is only supported with oauth2client<4.0.0
2025-08-17 17:08:56,745 [INFO] Google Search API initialized successfully.


In [32]:
import logging
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import WebBaseLoader
from bs4 import SoupStrainer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langgraph.graph import StateGraph, END
from langchain.chains import RetrievalQA
from langchain_google_genai import GoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from typing import TypedDict, List

# --- Logging setup ---
logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] [%(levelname)s] %(message)s",
    datefmt="%H:%M:%S"
)
logger = logging.getLogger(__name__)


class SearchRAGState(TypedDict):
    query: str
    search_results: List[dict]
    docs: List[str]
    vectorstore: any
    answer: str


def search_node(state: SearchRAGState) -> SearchRAGState:
    logger.info(f"Searching for query: {state['query']}")
    results = search(state["query"])  
    logger.info(f"Got {len(results)} search results")
    return {**state, "search_results": results}


def scrape_node(state: SearchRAGState) -> SearchRAGState:
    logger.info(f"Scraping {len(state['search_results'])} pages...")
    docs = []
    for result in state["search_results"]:
        try:
            loader = WebBaseLoader(
                result["link"], 
                bs_kwargs=dict(parse_only=SoupStrainer(["p", "h1", "h2", "h3"]))
            )
            page_docs = loader.load()
            docs.extend(page_docs)
            logger.info(f"Scraped {len(page_docs)} docs from {result['link']}")
        except Exception as e:
            logger.warning(f"Skipping {result['link']} due to error: {e}")
    logger.info(f"Total docs scraped: {len(docs)}")
    return {**state, "docs": docs}


def index_node(state: SearchRAGState) -> SearchRAGState:
    logger.info("Splitting documents into chunks...")
    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
    chunks = splitter.split_documents(state["docs"])
    logger.info(f"✅ Created {len(chunks)} chunks")

    logger.info("Building FAISS vectorstore with Google embeddings...")
    embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
    vectorstore = FAISS.from_documents(chunks, embeddings)
    logger.info("✅ Vectorstore ready")
    return {**state, "vectorstore": vectorstore}


def rag_node(state: SearchRAGState) -> SearchRAGState:
    logger.info("Running RetrievalQA...")
    retriever = state["vectorstore"].as_retriever(
        search_type="similarity", search_kwargs={"k": 5}
    )
    llm = GoogleGenerativeAI(model="gemini-1.5-flash")
    qa = RetrievalQA.from_chain_type(llm=llm, retriever=retriever)

    answer = qa.invoke(state["query"])
    logger.info(f"Got answer from LLM")
    return {**state, "answer": answer}


# --- Graph assembly ---
workflow = StateGraph(SearchRAGState)

workflow.add_node("search", search_node)
workflow.add_node("scrape", scrape_node)
workflow.add_node("index", index_node)
workflow.add_node("rag", rag_node)

workflow.set_entry_point("search")
workflow.add_edge("search", "scrape")
workflow.add_edge("scrape", "index")
workflow.add_edge("index", "rag")
workflow.add_edge("rag", END)

app = workflow.compile()


if __name__ == "__main__":
    init_state = {"query": "who is thanos"}
    final_state = app.invoke(init_state)
    logger.info(f"🎯 Final Answer: {final_state['answer']}")

2025-08-17 17:13:42,337 [INFO] Searching for query: who is hakla
2025-08-17 17:13:42,338 [INFO] Trying Google search for: who is hakla
2025-08-17 17:13:42,654 [INFO] Got 5 search results
2025-08-17 17:13:42,656 [INFO] Scraping 5 pages...
2025-08-17 17:13:42,896 [INFO] Scraped 1 docs from https://www.hindustantimes.com/entertainment/bollywood/shah-rukh-khan-was-called-hakla-behind-his-back-by-many-stars-claims-singer-abhijeet-bhattacharya-101734775718319.html
2025-08-17 17:13:43,518 [INFO] Scraped 1 docs from https://en.wikipedia.org/wiki/Haka
2025-08-17 17:13:45,641 [INFO] Scraped 1 docs from https://www.reddit.com/r/TeenIndia/comments/1mepqyh/how_did_the_hakla_meme_even_start/
2025-08-17 17:13:45,823 [INFO] Scraped 1 docs from https://timesofindia.indiatimes.com/viral-hakla-shah-rukh-khan-meme-explained-why-its-still-trending-despite-reported-takedown-attempts-by-srks-team/articleshow/123088001.cms
2025-08-17 17:13:45,971 [INFO] Scraped 1 docs from https://vocal.media/geeks/what-is-th

GoogleGenerativeAIError: Error embedding content: 403 Requests to this API generativelanguage.googleapis.com method google.ai.generativelanguage.v1beta.GenerativeService.BatchEmbedContents are blocked. [reason: "API_KEY_SERVICE_BLOCKED"
domain: "googleapis.com"
metadata {
  key: "service"
  value: "generativelanguage.googleapis.com"
}
metadata {
  key: "methodName"
  value: "google.ai.generativelanguage.v1beta.GenerativeService.BatchEmbedContents"
}
metadata {
  key: "consumer"
  value: "projects/1060660884177"
}
metadata {
  key: "apiName"
  value: "generativelanguage.googleapis.com"
}
, locale: "en-US"
message: "Requests to this API generativelanguage.googleapis.com method google.ai.generativelanguage.v1beta.GenerativeService.BatchEmbedContents are blocked."
]