In [0]:
%pip install -r requirements.txt # Original had -U
dbutils.library.restartPython()

In [0]:
import os
import requests
import markdownify
import yaml

import pandas as pd
import xml.etree.ElementTree as ET
import databricks.sdk.service.catalog as c

from bs4 import BeautifulSoup
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from concurrent.futures import ThreadPoolExecutor, as_completed

In [0]:
MAX_CONTENT_SIZE = 5 * 1024 * 1024  # 5MB per document
MAX_BATCH_SIZE = 100 * 1024 * 1024  # Flush every 100MB
URL_PARALLELISM = 50

MAX_DOCUMENTS = None
DATABRICKS_SITEMAP_URL = r"https://docs.databricks.com/aws/en/sitemap.xml"

with open("names.yaml", "r") as file:
    names = yaml.safe_load(file)
TABLE_NAME = names.get("table_name")

In [0]:
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {names.get("table_name")} (
    id BIGINT GENERATED BY DEFAULT AS IDENTITY,
    url STRING,
    content STRING
  ) TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

## Getting databricks docs data

In [0]:
# Fetch and convert HTML to Markdown
def fetch_and_parse(url, session):
    resp = session.get(url, stream=True, timeout=10)
    if resp.status_code != 200:
        return url, resp.status_code
    content = b""
    for chunk in resp.iter_content(chunk_size=8192):
        content += chunk
        if len(content) > MAX_CONTENT_SIZE:
            return url, "TOO_LARGE"
    soup = BeautifulSoup(content, "html.parser")
    article = soup.find("article")
    if not article:
        return url, None
    html_str = article.prettify()
    if len(html_str.encode("utf-8")) > MAX_CONTENT_SIZE:
        return url, "TOO_LARGE"
    markdown = markdownify.markdownify(html_str, heading_style="ATX")
    if len(markdown.encode("utf-8")) > MAX_CONTENT_SIZE:
        return url, "TOO_LARGE"
    return url, markdown

# Fetch sitemap URLs
def fetch_urls(session, max_documents=None):
    response = session.get(DATABRICKS_SITEMAP_URL)
    root = ET.fromstring(response.content)
    urls = [loc.text for loc in root.findall(".//{http://www.sitemaps.org/schemas/sitemap/0.9}loc")]
    return urls[:max_documents] if max_documents else urls

In [0]:
pending_results = []
accumulated_size, total_processed, total_skipped, chunk_idx = 0, 0, 0, 0

shared_session = requests.Session()
retries = Retry(total=3, backoff_factor=3, status_forcelist=[429])
adapter = HTTPAdapter(max_retries=retries, pool_maxsize=URL_PARALLELISM)
shared_session.mount("http://", adapter)
shared_session.mount("https://", adapter)

print(f'Downloading Databricks documentation to {TABLE_NAME}, this can take a few minutes...')
urls = fetch_urls(shared_session, MAX_DOCUMENTS)

In [0]:
def flush_to_delta(data):
    global chunk_idx
    df = pd.DataFrame(data, columns=["url", "content"])
    df = df[df["content"].notnull() & (df["content"] != "TOO_LARGE")]
    if not df.empty:
        spark_df = spark.createDataFrame(df)
        mode = "overwrite" if chunk_idx == 0 else "append"
        (spark_df.write
                .mode(mode)
                .format("delta")
                .option("mergeSchema", "true")
                .saveAsTable(names.get("table_name")))
        chunk_idx += 1

with ThreadPoolExecutor(max_workers=URL_PARALLELISM) as executor:

    futures = {executor.submit(fetch_and_parse, url, shared_session): url for url in urls}
    for future in as_completed(futures):
        url, text = future.result()

        if text and text != "TOO_LARGE":
            size = len(text.encode("utf-8"))
            accumulated_size += size
            pending_results.append((url, text))
        else:
            print(f"SKIPPED ({text}): {url}")
            total_skipped += 1

        if accumulated_size >= MAX_BATCH_SIZE or len(pending_results) >= 250:
            print(f"Flushing {len(pending_results)} rows (~{accumulated_size / 1024 / 1024:.1f} MB)")
            flush_to_delta(pending_results)
            pending_results = []
            accumulated_size = 0
        total_processed += 1

# Final flush
if pending_results:
    print(f"Final flush with {len(pending_results)} rows")
    flush_to_delta(pending_results)
print(f'Total skipped: {total_skipped}')