# Website Crawling and Data Extraction

In this section, we use `crawl4ai` to crawl one or more websites. The extracted text is saved as `.txt` files in a folder (e.g., `./crawled_data`).

You can enter the URLs interactively using the provided widget. If you wish, you can modify the code directly.


In [1]:
# --- Set Windows Proactor Event Loop (for Windows users) ---
import sys
import asyncio
if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

In [7]:
# If you run into issues with the event loop in Jupyter Notebook, uncomment the next two lines:
import nest_asyncio
nest_asyncio.apply()

# Imports used for Crawl4AI part
import asyncio
import threading
import ipywidgets as widgets
from IPython.display import display
import os
import json
import hashlib
from urllib.parse import urlparse
from urllib.parse import urlparse, urljoin

# Import Crawl4AI classes based on the documentation
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode

In [8]:
# --- URL Input Widget ---
url_input = widgets.Textarea(
    value='https://example.com\nhttps://crawl4ai.com/',
    placeholder='Enter one URL per line',
    description='URLs:',
    layout=widgets.Layout(width='100%', height='80px')
)
display(url_input)

Textarea(value='https://example.com\nhttps://crawl4ai.com/', description='URLs:', layout=Layout(height='80px',…

In [9]:
def get_safe_filename(url: str, extension: str = ".md") -> str:
    """
    Generate a safe and unique filename from a URL.
    
    The filename is based on the domain, path, and (if present) fragment of the URL.
    A short hash of the full URL is appended to ensure uniqueness.
    
    Examples:
      - "https://crawl4ai.com/" becomes something like "crawl4ai.com_index_<hash>.md"
      - "https://crawl4ai.com/#quick-start" becomes "crawl4ai.com_index_quick_start_<hash>.md"
    """
    parsed = urlparse(url)
    
    # Get the netloc and sanitize it (replace ':' with '_' if needed)
    netloc = parsed.netloc.replace(":", "_")
    
    # Use the path; default to "index" if empty
    path = parsed.path.strip("/")
    if not path:
        path = "index"
    
    # Use the fragment if present (replace any non-alphanumeric characters with underscores)
    fragment = parsed.fragment.strip()
    if fragment:
        # Keep only alphanumerics and a few safe characters
        fragment = "".join(c if c.isalnum() or c in "._-" else "_" for c in fragment)
    
    # Form the base filename from netloc, path, and fragment (if available)
    base = f"{netloc}_{path}"
    if fragment:
        base = f"{base}_{fragment}"
    
    # Sanitize the base further: keep only alphanumerics, dots, underscores, or hyphens
    safe_base = "".join(c if c.isalnum() or c in "._-" else "_" for c in base)
    
    # Compute a short hash of the full URL to ensure uniqueness
    hash_digest = hashlib.md5(url.encode("utf-8")).hexdigest()[:8]
    
    # Combine safe_base, hash, and extension
    safe_filename = f"{safe_base}_{hash_digest}{extension}"
    return safe_filename

In [13]:
def normalize_content(content: str) -> str:
    """
    Remove wrapping triple backticks (```) if present,
    as well as any language hints (like "json") that might appear
    immediately after the first set of backticks.
    """
    stripped = content.strip()
    # Remove any leading/trailing triple backticks
    # This pattern removes the first line if it starts with ``` (optionally followed by a language)
    # and the last line if it ends with ```
    if stripped.startswith("```") and stripped.endswith("```"):
        # Remove the first and last lines
        lines = stripped.splitlines()
        # If the first line is only backticks (or backticks with a language hint)
        if lines[0].strip().startswith("```"):
            lines = lines[1:]
        if lines and lines[-1].strip().endswith("```"):
            lines = lines[:-1]
        normalized = "\n".join(lines)
        return normalized.strip()
    return content

def is_not_found_content(content: str) -> bool:
    """
    Return True if content can be parsed as JSON and equals {"detail": "Not Found"}
    (ignoring extra whitespace or differences in spacing).
    """
    normalized = normalize_content(content)
    try:
        data = json.loads(normalized)
        # Check that it's a dict with a key "detail" whose value (case-insensitive) is "not found"
        if isinstance(data, dict) and data.get("detail", "").strip().lower() == "not found":
            return True
    except Exception:
        pass
    return False

In [11]:
# --- Convert Widget Content to a URL List ---
urls = url_input.value.strip().splitlines()
print("List of URLs to crawl:", urls)

# --- Set Up Browser Configuration ---
browser_cfg = BrowserConfig(
    browser_type="chromium",
    headless=True,
    verbose=True,
    viewport_width=1280,
    viewport_height=720,
)

# CrawlerRunConfig is set up to:
# - Exclude external links (only internal links remain in result.links)
run_cfg = CrawlerRunConfig(
    cache_mode=CacheMode.BYPASS,
    word_count_threshold=15,
    exclude_external_links=True,
    stream=True,  # For efficient processing when using arun_many()/arun()
)

# --- Create output folder for crawled data ---
output_folder = "crawled_data"
os.makedirs(output_folder, exist_ok=True)

# --- Define the Recursive Crawling Function ---
async def crawl_recursive(crawler, url, visited, depth, max_depth):
    if depth > max_depth or url in visited:
        return
    visited.add(url)
    
    # Perform the crawl for a single URL
    result = await crawler.arun(url, config=run_cfg)
    if result.success:
        print(f"Crawled URL: {result.url} at depth {depth}")
        
        # Attempt to use markdown output (preferably via markdown_v2 or markdown field)
        md_content = None
        if result.markdown:
            if isinstance(result.markdown, str):
                md_content = result.markdown
            elif hasattr(result.markdown, "raw_markdown"):
                md_content = result.markdown.raw_markdown
        elif result.markdown_v2 and hasattr(result.markdown_v2, "raw_markdown"):
            md_content = result.markdown_v2.raw_markdown
        
        if md_content:
            if is_not_found_content(md_content):
                print(f"Skipping saving markdown for {url} because it indicates Not Found.")
            else:
                filename = get_safe_filename(url, extension=".md")
                file_path = os.path.join(output_folder, filename)
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write(md_content)
                print(f"Saved markdown to {file_path}")
        else:
            # Fallback to cleaned HTML if no markdown is available.
            content = result.cleaned_html or ""
            if is_not_found_content(md_content):
                print(f"Skipping saving markdown for {url} because it indicates Not Found.")
            else:
                filename = get_safe_filename(url, extension=".html")
                file_path = os.path.join(output_folder, filename)
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write(content)
                print(f"Saved cleaned HTML to {file_path}")

        # Now, get the internal links from the CrawlResult.
        # According to the API, result.links is a dictionary that may contain an "internal" key.
        internal_links = result.links.get("internal", [])
        for link_info in internal_links:
            href = link_info.get("href")
            if href:
                # Convert relative URLs to absolute using the current URL as base
                absolute_url = urljoin(url, href)
                # Optionally, check that the domain is the same as the original URL
                if urlparse(absolute_url).netloc == urlparse(url).netloc:
                    await crawl_recursive(crawler, absolute_url, visited, depth + 1, max_depth)
    else:
        print(f"Failed to crawl URL: {url}")
        print(f"Error: {result.error_message}")

# --- Main Recursive Crawling Function ---
async def crawl_main(start_urls, max_depth=2):
    visited = set()
    async with AsyncWebCrawler(config=browser_cfg) as crawler:
        for url in start_urls:
            await crawl_recursive(crawler, url, visited, depth=0, max_depth=max_depth)
            
    # --- Run the Recursive Crawler ---
# await crawl_main(urls, max_depth=2)

List of URLs to crawl: ['https://crawl4ai.com/']




In [None]:
def run_crawl_in_thread():
    asyncio.run(crawl_main(urls, max_depth=4))

t = threading.Thread(target=run_crawl_in_thread)
t.start()
t.join()