In [1]:
pip install langchain-google-genai langchain-core langchain-community beautifulsoup4 requests python-dotenv aiohttp

Note: you may need to restart the kernel to use updated packages.


In [95]:
os.environ["GEMINI_API_KEY"] = 'AIzaSyB4jvOREif92yG0b_Y4FJK7atFxZYRwcn8'

In [35]:
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from lxml import html
import time
import random
from tqdm import tqdm
from typing import List

In [26]:
import google.generativeai as genai
import requests
from typing import List, Dict, Optional, Tuple
from tqdm import tqdm
from bs4 import BeautifulSoup
from langchain_community.tools import DuckDuckGoSearchRun
from dotenv import load_dotenv
import os

genai.configure(api_key=os.environ["GEMINI_API_KEY"])

In [28]:
def get_gemini_model(model_name = "gemini-pro", temperature = 0.4):
    return genai.GenerativeModel(model_name)

In [30]:
def get_generation_config(temperature = 0.4):
    return {
        "temperature": temperature,
        "top_p": 1,
        "top_k": 1,
        "max_output_tokens": 2048,
    }

In [31]:
def get_safety_settings():
    return [
        {"category": category, "threshold": "BLOCK_NONE"}
        for category in [
            "HARM_CATEGORY_HARASSMENT",
            "HARM_CATEGORY_HATE_SPEECH",
            "HARM_CATEGORY_SEXUALLY_EXPLICIT",
            "HARM_CATEGORY_DANGEROUS_CONTENT",
        ]
    ]

In [62]:
def generate_gemini_response(model, prompt):
    response = model.generate_content(
        prompt,
        generation_config=get_generation_config(),
        safety_settings=get_safety_settings()
    )
    if response.candidates and len(response.candidates) > 0:
        return response.candidates[0].content.parts[0].text
    return ''

In [89]:
def create_search_prompt(query, context = ""):
    system_prompt = """You are a smart assistant designed to determine whether a query needs data from a web search or can be answered using a document database. 
    Consider the provided context if available. 
    If the query requires external information, No context is provided, Irrelevent context is present or latest information is required, then output the special token <SEARCH> 
    followed by relevant keywords extracted from the query to optimize for search engine results. 
    Ensure the keywords are concise and relevant. If document data is sufficient, simply return blank."""
    
    if context:
        return f"{system_prompt}\n\nContext: {context}\n\nQuery: {query}"
    
    return f"{system_prompt}\n\nQuery: {query}"

In [70]:
def create_summary_prompt(content):
    return f"""Please provide a comprehensive yet concise summary of the following content, highlighting the most important points and maintaining factual accuracy. Organize the information in a clear and coherent manner:

Content to summarize:
{content}

Summary:"""


In [71]:
def init_selenium_driver():
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    
    driver = webdriver.Chrome(options=chrome_options)
    return driver


In [72]:
def extract_static_page(url):
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'lxml')
        
        text = soup.get_text(separator=" ", strip=True)
        return text[:5000]

    except requests.exceptions.RequestException as e:
        print(f"Error fetching page: {e}")
        return None
        
def extract_dynamic_page(url, driver):
    try:
        driver.get(url)
        time.sleep(random.uniform(2, 5))
        
        body = driver.find_element(By.TAG_NAME, "body")
        ActionChains(driver).move_to_element(body).perform()
        time.sleep(random.uniform(2, 5))
        
        page_source = driver.page_source
        tree = html.fromstring(page_source)
        
        text = tree.xpath('//body//text()')
        text_content = ' '.join(text).strip()
        return text_content[:1000]

    except Exception as e:
        print(f"Error fetching dynamic page: {e}")
        return None


In [73]:
def scrape_page(url):
    if "javascript" in url or "dynamic" in url:
        driver = init_selenium_driver()
        text = extract_dynamic_page(url, driver)
        driver.quit()
    else:
        text = extract_static_page(url)
    
    return text

In [74]:
def scrape_web(urls, max_urls = 5):
    texts = []
    
    for url in tqdm(urls[:max_urls], desc="Scraping websites"):
        text = scrape_page(url)
        
        if text:
            texts.append(text)
        else:
            print(f"Failed to retrieve content from {url}")
            
    return texts

In [75]:
def check_search_needed(model, query , context):
    prompt = create_search_prompt(query , context)
    response = generate_gemini_response(model, prompt)
    
    if "<SEARCH>" in response:
        search_terms = response.split("<SEARCH>")[1].strip()
        return True, search_terms
    return False, None

In [76]:
def summarize_content(model, content):
    prompt = create_summary_prompt(content)
    return generate_gemini_response(model, prompt)

In [77]:
def process_query(query , context = ''):
    model = get_gemini_model()
    search_tool = DuckDuckGoSearchRun()
    
    needs_search, search_terms = check_search_needed(model, query , context)
    
    result = {
        "original_query": query,
        "needs_search": needs_search,
        "search_terms": search_terms,
        "web_content": None,
        "summary": None,
        "raw_response": None
    }
    
    if needs_search:
        search_results = search_tool.run(search_terms)
        result["web_content"] = search_results
        
        summary = summarize_content(model, search_results)
        result["summary"] = summary
    
    return result

In [78]:
def run(question , context = ''):
    result = process_query(question , context = '')
    print("\nQuery Results:")
    print(f"Search needed: {result['needs_search']}")
    
    if result['needs_search']:
        print(f"\nSearch terms used: {result['search_terms']}")
        print("\nSummary of findings:")
        print(result['summary'])

In [102]:
run('Give me the name of highest rated GoT episode in the whole series')

  ddgs_gen = ddgs.text(



Query Results:
Search needed: True

Search terms used: highest rated GoT episode

Summary of findings:
**Highest-Rated Game of Thrones Episodes**

**"The Winds of Winter" (Season 5, Episode 10)**

* Key events: Deaths of Jon Snow, Stannis Baratheon, Myrcella Baratheon, Meryn Trant; Cersei Lannister's walk of shame
* Notable for its numerous game-changing moments and deaths

**"The Rains of Castamere" (Season 3, Episode 9)**

* Key events: Red Wedding massacre
* Remembered for its shocking and devastating impact

**Other Notable Episodes with IMDb Ratings of 9 or Above**

* Game of Thrones has 29 episodes rated 9 or above on IMDb, indicating its overall high quality

**Additional Points:**

* The sixth season finale features a stunning opening montage that foreshadows chaos.
* Game of Thrones was widely regarded as the best show on television for nearly a decade.
* Despite a divisive final season, HBO's adaptation of George R. R. Martin's fantasy series remains highly acclaimed.
