In [None]:
import os
import requests
from duckduckgo_search import DDGS

import os
import sys
sys.path.append(os.path.abspath("../"))
from autogen import initiate_chats, GroupChat, GroupChatManager
from funes.agents.agent_types import Role, Persona, AutogenAgentType
from autogen.agentchat.contrib.multimodal_conversable_agent import MultimodalConversableAgent
from llm_foundation import logger

from dotenv import find_dotenv, load_dotenv

from IPython.display import Image
from pathlib import Path

openai_api_key = os.environ["OPENAI_API_KEY"]

config_list = [{
    "organization": "org-Q3NBjIa66lyyJiZUKXYOXzBf",
    "model": "gpt-4o-mini",  # model name
    "api_key": openai_api_key  # api key
}]
llm_config = {
    "seed": 14,  # seed for caching and reproducibility
    "config_list": config_list,  # a list of OpenAI API configurations
    "temperature": 0.0,  # temperature for sampling
}


In [None]:
path = Path("./images/l1_and_l2_regularization_1.jpg").resolve()

print(path)

francisco = Persona.from_json_file("Persona/Francisco.json")
print(francisco)
user_proxy = francisco.role_to_autogen_agent("learner", AutogenAgentType.UserProxyAgent, llm_config=llm_config)


image_agent = MultimodalConversableAgent(
    name="image-explainer",
    max_consecutive_auto_reply=1,
    llm_config=llm_config,
)


prompt = f"""What's in this image? <img {path}>."""

print(prompt)

user_proxy.initiate_chat(
    image_agent,
    message=prompt,
)

In [None]:
!ls

In [None]:
print(path)
Image(url=path)


In [None]:
# Function to download images
def download_images(search_query, num_images=5, provider="DDG", directory="images"):
    search_query = search_query.replace(" ", "_")
    
    # Search for images using DuckDuckGo
    results = DDGS().images(
        keywords=search_query,
        region="us-en",
        safesearch="off",
        size=None,
        color="color",
        type_image=None,
        layout="Wide",
        license_image=None,
        max_results=num_images,
    )

    # Create a directory to save downloaded images
    if not os.path.exists(directory):
        os.makedirs(directory)

    # Loop through results and download each image
    for i, result in enumerate(results):
        image_url = result['image']
        
        print(f"Downloading image {i+1} from {image_url}")
        try:
            response = requests.get(image_url)
            if response.status_code == 200:
                image_path = f"{directory}/{search_query}_{i+1}.jpg"
                with open(image_path, 'wb') as file:
                    file.write(response.content)
                print(f"Image saved at {image_path}")
            else:
                print(f"Failed to download image {i+1}. Status code: {response.status_code}")
        except Exception as e:
            print(f"An error occurred while downloading image {i+1}: {e}")

# Example: Search for and download images of "puppies"
download_images("l1 and l2 regularization", num_images=5)


In [1]:
import pprint

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import AsyncHtmlLoader
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain_community.document_transformers import BeautifulSoupTransformer


USER_AGENT environment variable not set, consider setting it to identify your requests.


In [None]:
import asyncio
import nest_asyncio

nest_asyncio.apply()

url = "https://indianexpress.com/section/technology/"


loader = AsyncChromiumLoader([url])
docs = loader.load()
bs_transformer = BeautifulSoupTransformer()
docs_transformed = bs_transformer.transform_documents(
    docs, tags_to_extract=["span"]
)
print("Extracting content with LLM")

# Grab the first 1000 tokens of the site
splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=1000, chunk_overlap=0
)
splits = splitter.split_documents(docs_transformed)



print(splits)


# async def load_html():
#     html = loader.load()
#     return html

# loop = asyncio.get_event_loop()
# # html = asyncio.create_task(load_html())
# html = loop.run_until_complete(load_html())
# html

In [None]:
print(splits)
print(splits[0].page_content)
print(type(splits[0].page_content))
print(len(splits[0].page_content))


In [4]:
import asyncio
import nest_asyncio
nest_asyncio.apply()

from playwright.sync_api import sync_playwright
from playwright.async_api import async_playwright
from playwright_stealth import stealth_sync, stealth_async
from bs4 import BeautifulSoup
import traceback

# Web Scraper class definition
class WebScraper:
    def __init__(self, headless=True, browser_type="chromium", chunk_size=256, max_tokens=1000):
        self.headless = headless
        self.browser_type = browser_type
        self.chunk_size = chunk_size
        self.max_tokens = max_tokens

    def scrape_page(self, url: str) -> str:
        with sync_playwright() as p:
            browser = getattr(p, self.browser_type).launch(
                headless=self.headless,
                args=["--disable-gpu", "--no-sandbox"]
            )
            context = browser.new_context()
            page = context.new_page()

            stealth_sync(page)
            page.goto(url)

            html_content = page.content()
            browser.close()
        return html_content


    async def a_scrape_page(self, url: str) -> str:
        # with sync_playwright() as p:
        async with async_playwright() as p:
            browser = await getattr(p, self.browser_type).launch(
                headless=self.headless,
                args=["--disable-gpu", "--no-sandbox"]
            )
            context = await browser.new_context()
            page = await context.new_page()
            await stealth_async(page)
            await page.goto(url)

            html_content = await page.content()
            await browser.close()
        return html_content

    def extract_titles_articles_links(self, raw_html: str) -> list:
        soup = BeautifulSoup(raw_html, 'html.parser')
        extracted_data = []
        visited_links = set()
        for article in soup.find_all(['article', 'section', 'div']):
            title_tag = article.find(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
            link_tag = article.find('a', href=True)
            content = article.get_text(separator="\n", strip=True)
            
            if title_tag and link_tag and content and link_tag['href'] not in visited_links:
                extracted_data.append({
                    'title': title_tag.get_text(strip=True),
                    'link': link_tag['href'],
                    'content': content
                })
                visited_links.add(link_tag['href'])
        
        return extracted_data

    def query_page_content(self, url: str) -> dict:
        raw_html = self.scrape_page(url)
        structured_data = {
            "url": url,
            "extracted_data": self.extract_titles_articles_links(raw_html),
            "raw_html": raw_html
        }
        return structured_data


    async def a_query_page_content(self, url: str) -> dict:
        raw_html = await self.a_scrape_page(url)
        structured_data = {
            "url": url,
            "extracted_data": self.extract_titles_articles_links(raw_html),
            "raw_html": raw_html
        }
        return structured_data



def query_web_scraper(url: str) -> dict:
    scraper = WebScraper(headless=False)
    return scraper.query_page_content(url)


async def a_query_web_scraper(url: str) -> dict:
    scraper = WebScraper(headless=True)
    return await scraper.a_query_page_content(url)


url = "https://medium.com/javarevisited/what-i-learned-from-the-book-system-design-interview-an-insider-guide-77562e48cdaa"
html = asyncio.create_task(a_query_web_scraper(url))
# loop = asyncio.get_event_loop()
# html = loop.run_until_complete(a_query_web_scraper(url))
await html


{'url': 'https://medium.com/javarevisited/what-i-learned-from-the-book-system-design-interview-an-insider-guide-77562e48cdaa',
 'extracted_data': [{'title': 'Book Review — System Design Interview — An Insider Guide',
   'link': 'https://rsci.app.link/?%24canonical_url=https%3A%2F%2Fmedium.com%2Fp%2F77562e48cdaa&%7Efeature=LoOpenInAppButton&%7Echannel=ShowPostUnderCollection&source=---two_column_layout_nav----------------------------------',
   'content': "Open in app\nSign up\nSign in\nWrite\nSign up\nSign in\nBook Review — System Design Interview — An Insider Guide\nMy review of System Design Interview: An Insider’s Guide book by Alex Xu\nSoma\n·\nFollow\nPublished in\nJavarevisited\n·\n6 min read\n·\nApr 9, 2024\n--\n2\nListen\nShare\nI recently read the book\n“System Design Interview: An Insider’s Guide”\nauthored by Alex Xu, creator of “\nByteByteGo\n”(digital version of this book and volume 2 + more content) and a famous System Design expert on LinkedIn, and this is my review of t

In [32]:
import json
import re


array_string = "[snippet: This system design interview guide was written with feedback from over 50 EM, SWE, and TPM technical interview coaches at startups and MAANG+ companies such as Microsoft, Amazon, Meta, Google, Netflix, Dropbox, and Stripe. Collectively, they've conducted thousands of system design interviews with real candidates and teams., title: Nail the System Design Interview: Complete Guide | Exponent, link: https://www.tryexponent.com/blog/system-design-interview-guide], [snippet: You can also refer to our system design interview prep guide and our list of 19 system design interview tips from ex-interviewers. Otherwise, let's start with preparation step 1. 1. Learn the concepts. There is a base level of knowledge required to be able to speak intelligently about system design. You don't need to know EVERYTHING about ..., title: 11 Most-Asked System Design Interview Questions (+ answers), link: https://igotanoffer.com/blogs/tech/system-design-interviews]"

def extract_json_from_string(string):
    extracted_refs = []
    pattern = r"\[snippet: (.*?), title: (.*?), link: (.*?)\]"
    matches = re.findall(pattern, string)
    ref = {}
    for match in matches:
        ref["snippet"] = match[0]
        ref["title"] = match[1]
        ref["link"] = match[2]
        extracted_refs.append(ref)
    return extracted_refs

extracted_data = extract_json_from_string(array_string)
import pprint
pprint.pprint(extracted_data)

[{'link': 'https://igotanoffer.com/blogs/tech/system-design-interviews',
  'snippet': 'You can also refer to our system design interview prep guide and '
             'our list of 19 system design interview tips from '
             "ex-interviewers. Otherwise, let's start with preparation step 1. "
             '1. Learn the concepts. There is a base level of knowledge '
             'required to be able to speak intelligently about system design. '
             "You don't need to know EVERYTHING about ...",
  'title': '11 Most-Asked System Design Interview Questions (+ answers)'},
 {'link': 'https://igotanoffer.com/blogs/tech/system-design-interviews',
  'snippet': 'You can also refer to our system design interview prep guide and '
             'our list of 19 system design interview tips from '
             "ex-interviewers. Otherwise, let's start with preparation step 1. "
             '1. Learn the concepts. There is a base level of knowledge '
             'required to be able to 