In [None]:
import requests
from bs4 import BeautifulSoup

def get_top_google_results(query, num_results=5):
    """
    Fetch the top URL pages from Google search results for a given query without using an API.

    Args:
        query (str): Search query.
        num_results (int): Number of top results to fetch (default: 5).

    Returns:
        list: A list of URLs of the top search results.
    """
    # Encode the query for the URL
    query = query.replace(" ", "+")
    google_url = f"https://www.google.com/search?q={query}&num={num_results}"

    # Set user-agent to mimic a browser
    headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        )
    }

    # Send a GET request to Google
    response = requests.get(google_url, headers=headers)
    response.raise_for_status()

    # Parse the response with BeautifulSoup
    soup = BeautifulSoup(response.text, "html.parser")

    # Extract URLs from search results
    urls = []
    for g in soup.find_all('div', class_='tF2Cxc'):  # Google search result containers
        link = g.find('a', href=True)
        if link and link['href']:
            urls.append(link['href'])
            if len(urls) == num_results:
                break

    return urls

# Example usage
query = "apple's revenue in 2024"
top_urls = get_top_google_results(query)

urls_list = []

print("Top 5 URLs:")
for idx, url in enumerate(top_urls, start=1):
    print(f"{idx}: {url}")
    urls_list.append(url)


In [None]:
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.content_filter_strategy import BM25ContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator

async def main(query, url):
    # 1) A BM25 filter with a user query
    bm25_filter = BM25ContentFilter(
        user_query=query,
        # Adjust for stricter or looser results
        bm25_threshold=1.2  
    )

    # 2) Insert into a Markdown Generator
    md_generator = DefaultMarkdownGenerator(content_filter=bm25_filter)

    # 3) Pass to crawler config
    config = CrawlerRunConfig(
        markdown_generator=md_generator,
        word_count_threshold=10,
        excluded_tags=["nav", "footer", "header"],
        exclude_external_links=True,
        exclude_external_images=True.as_integer_ratio,
        process_iframes=True,    
        remove_overlay_elements=True
    )

    

    async with AsyncWebCrawler() as crawler:
        result = await crawler.arun(
            url=url, 
            config=config
        )
        if result.success:
            print("Fit Markdown (BM25 query-based):")
            print(result.markdown)
            print('-'*47)
        else:
            print("Error:", result.error_message)
        
        return result

if __name__ == "__main__":
    result = asyncio.run(main(query="Private equity backed companies based in Europe", url="https://en.wikipedia.org/wiki/List_of_private_equity_firms"))

In [1]:
import asyncio
from typing import List, Union, Tuple
import aiohttp
import io
import pdfplumber
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.content_filter_strategy import BM25ContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator

async def is_pdf_url(session: aiohttp.ClientSession, url: str) -> bool:
    try:
        async with session.head(url, allow_redirects=True) as response:
            if 'application/pdf' in response.headers.get('Content-Type', '').lower():
                return True
        async with session.get(url) as response:
            chunk = await response.content.read(5)
            return chunk.startswith(b'%PDF-')
    except Exception:
        return False

async def extract_pdf_text(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as response:
        pdf_content = await response.read()
    with pdfplumber.open(io.BytesIO(pdf_content)) as pdf:
        return "\n".join(page.extract_text() for page in pdf.pages if page.extract_text())

async def process_url(session: aiohttp.ClientSession, url: str, crawler: AsyncWebCrawler, 
                     config: CrawlerRunConfig) -> Tuple[bool, Union[str, Exception]]:
    try:
        if await is_pdf_url(session, url):
            text = await extract_pdf_text(session, url)
            return True, text
        else:
            result = await crawler.arun(url=url, config=config)
            if result.success:
                return True, result.markdown
            return False, f"Crawl failed: {result.error_message}"
    except Exception as e:
        return False, e

async def crawl_parallel(urls: List[str], max_concurrent: int = 3) -> List[Tuple[str, bool, Union[str, Exception]]]:
    crawl_config = CrawlerRunConfig(
        word_count_threshold=10,
        excluded_tags=['script', 'style', 'nav', 'header', 'footer', 
                      'iframe', 'form', 'button', 'img', 'noscript',
                      'svg', 'input', 'meta', 'select', 'textarea'],
        exclude_external_links=True,
        exclude_external_images=True,
        process_iframes=False,
        remove_overlay_elements=True
    )

    crawler = AsyncWebCrawler()
    await crawler.start()
    
    # Create a semaphore to limit concurrent tasks
    semaphore = asyncio.Semaphore(max_concurrent)

    async def bounded_process_url(url: str) -> Tuple[str, bool, Union[str, Exception]]:
        async with semaphore:
            try:
                async with aiohttp.ClientSession() as session:
                    success, content = await process_url(session, url, crawler, crawl_config)
                    return url, success, content
            except Exception as e:
                return url, False, str(e)

    try:
        # Process all URLs concurrently with bounded parallelism
        tasks = [bounded_process_url(url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

    finally:
        await crawler.close()
        

async def main(urls: List[str]) -> List[Tuple[str, bool, Union[str, Exception]]]:
    results = await crawl_parallel(urls,  max_concurrent=10)
    
    # # Print results (optional)
    # for url, success, content in results:
    #     if success:
    #         print(f"Successfully processed {url}")
    #         print(content[:200] + "..." if len(content) > 200 else content)
    #     else:
    #         print(f"Failed to process {url}: {content}")
    #     print('-' * 47)
    
    return results

In [None]:
if __name__ == "__main__":
    urls = ['https://www.designboom.com/technology/meta-true-ar-glasses-orion-smartphones-hands-free-wearable-ai-device-09-26-2024/#:~:text=Meta%20unveils%20Orion%2C%20its%20dubbed,September%2025th%20and%2026th%2C%202024.',
   'https://www.pymnts.com/connectedeconomy/2024/zuckerberg-unveils-metas-wearable-tech-plans-for-the-connected-economy/',
   'https://sherwood.news/tech/meta-ray-ban-apple-vision-pro-competition/',
   'https://www.theverge.com/2024/10/11/24267633/meta-hardware-glasses-quest-andrew-bosworth-interview',
   'https://medium.com/@jcorcione/meta-reorganizes-introduces-wearables-unit-despite-job-cuts-d19f1dd10aa6',
   "https://www.aku.edu/admissions/Documents/student-health-services.pdf",
   'https://insidetelecom.com/wearable-technology-competition-between-apple-and-meta/',
   'https://twit.tv/posts/tech/meta-and-future-wearable-tech',
   'https://en.wikipedia.org/wiki/Smartglasses']
    
    urls = ["https://www.google.com/search?q=what+is+the+revenue+of+bykea&rlz=1C5CHFA_enPK1041PK1041&oq=what+is+the+revenue+of+bykea&gs_lcrp=EgZjaHJvbWUyBggAEEUYOTIICAEQABgWGB4yDQgCEAAYhgMYgAQYigUyDQgDEAAYhgMYgAQYigUyDQgEEAAYhgMYgAQYigUyDQgFEAAYhgMYgAQYigUyCggGEAAYgAQYogQyCggHEAAYgAQYogQyCggIEAAYgAQYogQyBwgJEAAY7wXSAQg2MTk1ajBqN6gCALACAA&sourceid=chrome&ie=UTF-8"]

    results = await (main(urls))

In [None]:
print(results[0][2])

In [None]:
results

In [None]:
res

In [None]:
# import asyncio
# from typing import List
# from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
# from crawl4ai.content_filter_strategy import BM25ContentFilter
# from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator

# async def crawl_parallel(urls: List[str], query: str, max_concurrent: int = 3):
#     # Initialize BM25 filter
#     bm25_filter = BM25ContentFilter(
#         user_query=query,
#         bm25_threshold=1.2  # Adjust for stricter or looser results
#     )

#     # Set up Markdown Generator
#     md_generator = DefaultMarkdownGenerator(content_filter=bm25_filter)

#     # Configure crawler
#     crawl_config = CrawlerRunConfig(
#         # markdown_generator=md_generator,
#         word_count_threshold=10,
#         excluded_tags=['script', 'style', 'nav', 'header', 'footer', 
#                                     'iframe', 'form', 'button', 'img', 'noscript',
#                                     'svg', 'input', 'meta', 'select', 'textarea'],
#         exclude_external_links=True,
#         exclude_external_images=True,
#         process_iframes=False,
#         remove_overlay_elements=True
#     )

#     # Create crawler instance
#     crawler = AsyncWebCrawler()
#     await crawler.start()
#     # Process URLs in parallel batches
#     results = []

#     try:
#         for i in range(0, len(urls), max_concurrent):
#             batch = urls[i : i + max_concurrent]
#             tasks = [crawler.arun(url=url, config=crawl_config) for url in batch]  # Await coroutine
#             batch_results = await asyncio.gather(*tasks, return_exceptions=True)
#             results.extend(batch_results)

#         # Process results
#         for url, result in zip(urls, results):
#             if isinstance(result, Exception):
#                 print(f"Error crawling {url}: {result}")
#             elif result.success:
#                 print(f"Markdown for {url} (BM25 query-based):")
#                 print(result.markdown_v2.fit_markdown)
#                 print('-' * 47)
#                 print(result.markdown)
#                 print('-' * 47)
#             else:
#                 print(f"Failed to crawl {url}: {result.error_message}")

#     finally:
#         # Close the crawler
#         await crawler.close()
#         return results

# async def main(urls):  # Replace with your URLs
#     query = "Apple's Revenue in 2024"
#     results = await crawl_parallel(urls, query, max_concurrent=10)
#     return results

# if __name__ == "__main__":
#     urls = ['https://www.designboom.com/technology/meta-true-ar-glasses-orion-smartphones-hands-free-wearable-ai-device-09-26-2024/#:~:text=Meta%20unveils%20Orion%2C%20its%20dubbed,September%2025th%20and%2026th%2C%202024.',
#    'https://www.pymnts.com/connectedeconomy/2024/zuckerberg-unveils-metas-wearable-tech-plans-for-the-connected-economy/',
#    'https://sherwood.news/tech/meta-ray-ban-apple-vision-pro-competition/',
#    'https://www.theverge.com/2024/10/11/24267633/meta-hardware-glasses-quest-andrew-bosworth-interview',
#    'https://medium.com/@jcorcione/meta-reorganizes-introduces-wearables-unit-despite-job-cuts-d19f1dd10aa6',
#    'https://insidetelecom.com/wearable-technology-competition-between-apple-and-meta/',
#    'https://twit.tv/posts/tech/meta-and-future-wearable-tech',
#    'https://en.wikipedia.org/wiki/Smartglasses']
#     urls = ["https://www.aku.edu/admissions/Documents/student-health-services.pdf"]
#     results = await (main(urls))


In [None]:
results

# Approach 2

In [2]:
import re
import ftfy

def clean_text(text: str) -> str:
        """Enhanced text cleaning function"""
        try:
            # Basic cleaning
            text = ftfy.fix_text(text)  # Fix encoding issues
            text = re.sub(r"http\S+|www\S+|https\S+", "", text)  # Remove URLs
            text = re.sub(r'\S+@\S+', '', text)  # Remove Emails
            text = re.sub(r'\[.*?\]|\(.*?\)', '', text)  # Remove markdown links
            
            # Remove special characters and normalize spaces
            text = re.sub(r'[^\x00-\x7F]+', '', text)  # Remove non-ASCII characters
            text = re.sub(r'[\r\n\t]+', ' ', text)  # Replace newlines and tabs with space
            text = re.sub(r'\s+', ' ', text)  # Normalize whitespace
            
            # Remove common noise patterns
            text = re.sub(r'Share\s+on\s+\w+', '', text, flags=re.IGNORECASE)  # Remove social sharing text
            text = re.sub(r'cookie[s]?\s+policy', '', text, flags=re.IGNORECASE)  # Remove cookie notices
            text = re.sub(r'subscribe|sign up|newsletter', '', text, flags=re.IGNORECASE)  # Remove promotional text
            
            # Remove very long strings (likely garbage)
            text = ' '.join(word for word in text.split() if len(word) < 50)
            
            return text.strip()
        except Exception as e:
            print(f"Error in clean_text: {str(e)}")
            return text

In [15]:
import json

snippets = []
with open('snippets.json') as f:
    snippets = json.load(f)

In [16]:
snippets_data = []
for item in snippets:
    snippets_list = []
    urls_list = []
    for snp in item['query_result']:
        snippets_list.append(snp['snippet'])
        urls_list.append(snp['link'])
    snippets_data.append({'query': item['query'], 'snippets': snippets_list, 'urls': urls_list})

In [20]:
crawled_list_all = []
for item in snippets_data:
    crawled_list = await main(item['urls'])
    snippets_list = []
    content_list = []
    assert len(item['snippets']) == len(item['urls'])
    print('-'*26)
    print(len(item['snippets']), len(item['urls']))

    for cr, sp, url in zip(crawled_list, item['snippets'], item['urls']):
        if str(cr[0]) == url and cr[1]:
            snippets_list.append(sp)
            content_list.append(clean_text(cr[2]))
        else:
            snippets_list.append(sp)
            content_list.append("")

        assert len(snippets_list) == len(content_list)
        
    print('-'*26)
    print(len(snippets_list), len(content_list))
            
    crawled_list_all.append({"query": item["query"],"snippets": snippets_list, "content": content_list})
    
        


[INIT].... → Crawl4AI 0.4.247
[FETCH]... ↓ https://www.theverge.com/2024/10/11/24267633/meta-... | Status: True | Time: 0.01s
[COMPLETE] ● https://www.theverge.com/2024/10/11/24267633/meta-... | Status: True | Total: 0.05s
[FETCH]... ↓ https://medium.com/@jcorcione/meta-reorganizes-int... | Status: True | Time: 0.00s
[COMPLETE] ● https://medium.com/@jcorcione/meta-reorganizes-int... | Status: True | Total: 0.04s
[FETCH]... ↓ https://finance.yahoo.com/news/wearable-devices-bo... | Status: True | Time: 0.01s
[COMPLETE] ● https://finance.yahoo.com/news/wearable-devices-bo... | Status: True | Total: 0.05s
[FETCH]... ↓ https://sherwood.news/tech/meta-ray-ban-apple-visi... | Status: True | Time: 0.01s
[COMPLETE] ● https://sherwood.news/tech/meta-ray-ban-apple-visi... | Status: True | Total: 0.08s
[FETCH]... ↓ https://en.wikipedia.org/wiki/Smartglasses... | Status: True | Time: 0.01s
[COMPLETE] ● https://en.wikipedia.org/wiki/Smartglasses... | Status: True | Total: 0.05s
[FETCH]... ↓ https://

In [14]:
for page in crawled_list_all:
    index  = 0
    for sp, ct, in zip(page['snippets'], page['content']):
        print(index)
        index+=1
        assert len(sp) == len(ct)

0


AssertionError: 

In [21]:
with open('crawled_cleaned_content_v2.json', 'w') as f:
    json.dump(crawled_list_all, f, indent=2)

In [12]:
crawled_cleaned_content_list = []
for item in crawled_list_all:
    cleaned_content_list = []
    for content in item:
        cleaned_content_list.append(clean_text(str(content[2])))
    crawled_cleaned_content_list.append(cleaned_content_list)


In [None]:
snippets_data

In [None]:
cleaned_markdown = clean_text(results[0].markdown)

In [None]:
len(cleaned_markdown), len(results[0].markdown)

In [None]:
print(cleaned_markdown)

In [None]:
class SlidingWindowChunking:
    def __init__(self, window_size=400, step=350):
        self.window_size = window_size
        self.step = step

    def chunk(self, text):
        words = text.split()
        chunks = []
        for i in range(0, len(words) - self.window_size + 1, self.step):
            chunks.append(' '.join(words[i:i + self.window_size]))
        return chunks

# Example Usage
text = "This is a long text to demonstrate sliding window chunking."
chunker = SlidingWindowChunking()
print(chunker.chunk(text))

In [13]:
chunks_list = chunker.chunk(result.markdown_v2.fit_markdown)

In [None]:
len(chunks_list)

In [None]:
type(chunks_list[0])

In [22]:
import os
import httpx

from dotenv import load_dotenv

_ = load_dotenv()


async def get_source_from_pegasus(params):
    pegasus_url = os.getenv("CLOUDFUNCTION_SERVICE")
    headers = {"accept": "application/json"}
    async with httpx.AsyncClient(timeout=300) as client:
        try:
            response = await client.get(pegasus_url, headers=headers, params=params)
            response.raise_for_status()
            json_response = response.json()
            return json_response
        except httpx.HTTPStatusError as http_err:
            status_code = http_err.response.status_code
            print(f"Status code: {status_code}")
            return None
        except httpx.RequestError as req_err:
            print(f"Request error occurred: {str(req_err)}")
            return None
        except Exception as e:
            print(f"Exception: {e}")
            return None

In [23]:
query = "Biggest Venture Capital Firms in Germany"

In [None]:
pegasus_result = await get_source_from_pegasus(params={"query": query})

In [None]:
pegasus_result['query_result'][0]['link']

In [None]:
for result in pegasus_result['query_result']:
    urls_list.apend(result['link'])