## Importing All Environment Variables


In [None]:
import os
os.environ["QDRANT_URL"] = "..."
os.environ["QDRANT_API_KEY"] = "..."
os.environ["OPENAI_API_KEY"] = "..."
os.environ["NGROK_TOKEN"] = "..."

## Data collection pipeline

In [75]:
import os
import json
import time
import random
import requests
from pathlib import Path
from bs4 import BeautifulSoup as BS
from tqdm import tqdm

# Base URL and local directories
BASE_URL = "https://www.deeplearning.ai"
RAW_DIR = Path("data/raw")
PAGE_DIR = RAW_DIR / "pages"
POST_DIR = RAW_DIR / "posts"
HEADERS = {"User-Agent": "Mozilla/5.0"}

# Create necessary directories if they don't exist
for directory in (PAGE_DIR, POST_DIR):
    directory.mkdir(parents=True, exist_ok=True)

def backoff_sleep(attempt: int) -> None:
    """
    Sleep with exponential backoff to handle rate limiting.
    """
    time.sleep((2 ** attempt) + random.random())

def fetch_front_page() -> tuple[str, list[dict]]:
    """
    Retrieve the build ID and list of posts from the main page's __NEXT_DATA__.
    Saves the retrieved JSON to a local file.
    """
    url = f"{BASE_URL}/the-batch"
    response = requests.get(url, headers=HEADERS, timeout=15)
    response.raise_for_status()
    data = json.loads(BS(response.text, "lxml").select_one("#__NEXT_DATA__").string)
    build_id = data["buildId"]
    posts = data["props"]["pageProps"]["posts"]

    # Save front page JSON
    (PAGE_DIR / "page_1.json").write_text(json.dumps(data, ensure_ascii=False, indent=2))
    return build_id, posts

def fetch_index_page(build_id: str, page: int) -> list[dict] | None:
    """
    Fetch a paginated index JSON for posts.
    Returns a list of post dicts, or None if no more pages exist.
    """
    url = f"{BASE_URL}/_next/data/{build_id}/the-batch/page/{page}.json?pageNo={page}"
    response = requests.get(url, headers=HEADERS, timeout=15)
    if response.status_code == 404:
        return None
    response.raise_for_status()
    data = response.json()

    # Save page JSON
    (PAGE_DIR / f"page_{page}.json").write_text(json.dumps(data, ensure_ascii=False, indent=2))
    return data.get("pageProps", {}).get("posts", [])

def fetch_post_json(build_id: str, slug: str) -> None:
    """
    Download the post JSON for a given slug.
    Uses exponential backoff on HTTP 429 or 504 errors.
    """
    output_path = POST_DIR / f"{slug}.json"
    if output_path.exists():
        return

    url = f"{BASE_URL}/_next/data/{build_id}/the-batch/{slug}.json?slug={slug}"
    max_retries = 3
    for attempt in range(max_retries):
        response = requests.get(url, headers=HEADERS, timeout=30)
        if response.status_code in (429, 504):
            backoff_sleep(attempt)
            continue
        if response.status_code == 404:
            print(f"Warning: {slug} returned 404, skipping.")
            return
        response.raise_for_status()
        output_path.write_text(response.text, encoding="utf-8")
        return

    print(f"Warning: {slug} failed after {max_retries} retries due to rate limiting.")

def scrape_batch_posts():
    # Fetch front page build ID and initial posts
    build_id, all_posts = fetch_front_page()
    print(f"Build ID: {build_id} | Initial posts: {len(all_posts)}")

    # Fetch subsequent index pages
    page_number = 2
    while True:
        posts = fetch_index_page(build_id, page_number)
        if not posts:
            break
        print(f"Page {page_number}: {len(posts)} posts")
        all_posts.extend(posts)
        page_number += 1
        time.sleep(0.3)

    # Save metadata about all posts
    meta_file = Path("data/posts_meta.jsonl")
    meta_file.parent.mkdir(parents=True, exist_ok=True)
    with meta_file.open("w", encoding="utf-8") as f:
        for post in all_posts:
            f.write(json.dumps(post, ensure_ascii=False) + "\n")

    # Download each post's content
    for post in tqdm(all_posts, desc="Downloading posts"):
        fetch_post_json(build_id, post["slug"])

scrape_batch_posts()

Build ID: v_7Wdn7Y_3E7b3v5FzsK6 | Initial posts: 16
Page 2: 15 posts
Page 3: 15 posts
Page 4: 15 posts
Page 5: 15 posts
Page 6: 15 posts
Page 7: 15 posts
Page 8: 15 posts
Page 9: 15 posts
Page 10: 15 posts
Page 11: 15 posts
Page 12: 15 posts
Page 13: 15 posts
Page 14: 15 posts
Page 15: 15 posts
Page 16: 15 posts
Page 17: 15 posts
Page 18: 15 posts
Page 19: 15 posts
Page 20: 15 posts
Page 21: 15 posts
Page 22: 2 posts


Downloading posts: 100%|██████████| 318/318 [07:36<00:00,  1.43s/it]


## Content Processing Pipeline

In [76]:
import re
from pathlib import Path
from typing import Iterable, Tuple, Dict, List

from bs4 import BeautifulSoup
import html2text
from tqdm import tqdm

from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Directories
RAW_DIR = Path("data/raw/posts")
PROC_DIR = Path("data/processed")
PROC_DIR.mkdir(parents=True, exist_ok=True)

# Text-chunking parameters
CHUNK_SIZE = 768      # characters per chunk
CHUNK_OVERLAP = 128   # overlap between chunks

# Initialize the text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    length_function=len,
    separators=["\n\n", "\n", " ", ""],
)

# Configure HTML-to-markdown converter
md_converter = html2text.HTML2Text()
md_converter.body_width = 0  # disable line wrapping

def parse_html(html: str) -> Tuple[str, List[Dict]]:
    """
    Convert HTML to Markdown and extract image info.
    Returns:
      - markdown text (str)
      - list of image dicts: { "url": ..., "alt": ... }
    """
    soup = BeautifulSoup(html, "lxml")

    # Extract all <img> tags
    images: List[Dict] = []
    for img in soup.select("img[src]"):
        src = img["src"]
        if src.startswith("//"):
            src = "https:" + src
        images.append({"url": src, "alt": img.get("alt", "")})

    # Remove unwanted tags
    for tag in soup(["script", "style", "noscript"]):
        tag.decompose()

    # Convert to Markdown and collapse multiple blank lines
    markdown = md_converter.handle(str(soup))
    markdown = re.sub(r"\n{3,}", "\n\n", markdown).strip()

    return markdown, images

def article_to_documents(post_json: dict) -> Tuple[List[Document], List[Dict]]:
    """
    Turn a post JSON into:
      - a list of Document objects (text chunks)
      - a list of image metadata dicts
    """
    post = post_json["pageProps"]["cmsData"]["post"]
    slug = post["slug"]
    title = post["title"]
    published = post["published_at"]
    feature_image = post.get("feature_image")
    html_content = post["html"]

    # Parse HTML into markdown + image list
    markdown, images = parse_html(html_content)

    # Prepend feature image if available
    if feature_image:
        images.insert(0, {
            "url": feature_image,
            "alt": post.get("feature_image_alt", "")
        })

    # Split markdown into chunks
    chunks = text_splitter.split_text(markdown)
    docs: List[Document] = []
    for idx, chunk in enumerate(chunks):
        docs.append(Document(
            page_content=chunk,
            metadata={
                "slug": slug,
                "title": title,
                "published": published,
                "chunk_id": idx,
                "n_chunks": len(chunks),
            },
        ))

    # Attach slug to each image record
    images_metadata = [{"slug": slug, **img} for img in images]
    return docs, images_metadata

def iter_post_files() -> Iterable[Path]:
    """Yield all JSON files in the raw posts directory."""
    return RAW_DIR.glob("*.json")

# Output file paths
docs_output = PROC_DIR / "chunks.jsonl"
images_output = PROC_DIR / "images.jsonl"

# Process each post and write out chunks & images
with docs_output.open("w", encoding="utf-8") as doc_file, \
     images_output.open("w", encoding="utf-8") as img_file:

    for post_file in tqdm(list(iter_post_files()), desc="Processing posts"):
        post_data = json.loads(post_file.read_text(encoding="utf-8"))
        docs, imgs = article_to_documents(post_data)

        # Write each Document as a JSON line
        for doc in docs:
            doc_file.write(json.dumps(doc.model_dump(), ensure_ascii=False) + "\n")

        # Write each image record as a JSON line
        for img in imgs:
            img_file.write(json.dumps(img, ensure_ascii=False) + "\n")

Processing posts: 100%|██████████| 316/316 [00:06<00:00, 51.18it/s]


## Indexing into Qdrant

In [77]:
import itertools
import uuid
import io
from pathlib import Path
from functools import lru_cache

import torch
import requests
from PIL import Image, ImageSequence
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from transformers import pipeline
from qdrant_client import QdrantClient
from qdrant_client.http import models as qmodels

# Paths
DATA_DIR = Path("data/processed")
CHUNKS = DATA_DIR / "chunks.jsonl"
IMAGES = DATA_DIR / "images.jsonl"

# Device & Models
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

TXT_MODEL = SentenceTransformer("intfloat/e5-base-v2", device=DEVICE)
CLIP_MODEL = SentenceTransformer("clip-ViT-B-32", device=DEVICE)

TXT_DIM = TXT_MODEL.get_sentence_embedding_dimension() or 768
IMG_DIM = CLIP_MODEL.get_sentence_embedding_dimension() or 512

# Qdrant Cloud config
QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_KEY = os.getenv("QDRANT_API_KEY")
assert QDRANT_URL and QDRANT_KEY, "Set QDRANT_URL and QDRANT_API_KEY environment variables"

COLLECTION = "batch_multimodal"
client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_KEY, timeout=60)

CAPTIONER_DEVICE = 0 if torch.cuda.is_available() else -1  # -1 → CPU

@lru_cache()
def captioner():
    """Load and cache the BLIP image-to-text pipeline."""
    return pipeline(
        "image-to-text",
        model="Salesforce/blip-image-captioning-base",
        device=CAPTIONER_DEVICE,
    )

def generate_caption(url: str) -> str:
    """
    Download an image (handles GIFs by selecting the first frame),
    then generate and return a caption via BLIP.
    Returns an empty string on failure.
    """
    try:
        resp = requests.get(url, timeout=10)
        resp.raise_for_status()
        buf = io.BytesIO(resp.content)
        img = Image.open(buf)
        # If animated or GIF, take first frame
        if getattr(img, "is_animated", False) or img.format == "GIF":
            frame = next(ImageSequence.Iterator(img))
            img = frame.convert("RGB")
        else:
            img = img.convert("RGB")

        result = captioner()(img, max_new_tokens=30)[0]
        return result.get("generated_text", "").strip()
    except Exception:
        return ""

def safe_caption(meta: dict) -> str:
    """
    Use provided alt text if it has >=2 words, otherwise generate via BLIP.
    """
    alt = (meta.get("alt") or "").strip()
    if len(alt.split()) < 2:
        return generate_caption(meta["url"])
    return alt

def batched(iterable, n: int):
    """Yield successive n-sized batches from iterable."""
    it = iter(iterable)
    while True:
        batch = list(itertools.islice(it, n))
        if not batch:
            break
        yield batch

# Index Text Chunks
def index_text(client: QdrantClient):
    """Read text chunks, compute embeddings, and upsert into Qdrant."""
    with CHUNKS.open(encoding="utf-8") as f:
        docs = [json.loads(line) for line in f]

    for batch in tqdm(batched(docs, 256), total=len(docs)//256 + 1, desc="Text"):
        texts = [doc["page_content"] for doc in batch]
        embeds = TXT_MODEL.encode(texts, batch_size=32, show_progress_bar=False)

        points = []
        for doc, vec in zip(batch, embeds):
            payload = {
                **doc.get("metadata", {}),
                "page_content": doc["page_content"],
                "doc_type": "text",
            }
            points.append(qmodels.PointStruct(
                id=str(uuid.uuid4()),
                vector={"text": vec.tolist()},
                payload=payload,
            ))

        client.upsert(collection_name=COLLECTION, points=points)

# ── 2Index Images
def index_images(client: QdrantClient):
    """Read image metadata, generate captions, compute embeddings, and upsert into Qdrant."""
    with IMAGES.open(encoding="utf-8") as f:
        imgs = [json.loads(line) for line in f]

    for batch in tqdm(batched(imgs, 128), total=len(imgs)//128 + 1, desc="Img"):
        captioned = []
        for meta in batch:
            cap = safe_caption(meta)
            if cap:
                captioned.append((meta, cap))

        if not captioned:
            continue

        metas, captions = zip(*captioned)
        embeds = CLIP_MODEL.encode(list(captions), batch_size=32, show_progress_bar=False)

        points = []
        for meta, vec, cap in zip(metas, embeds, captions):
            points.append(qmodels.PointStruct(
                id=str(uuid.uuid4()),
                vector={"image": vec.tolist()},
                payload={
                    "slug": meta["slug"],
                    "img_url": meta["url"],
                    "caption": cap,
                    "doc_type": "image",
                },
            ))

        client.upsert(collection_name=COLLECTION, points=points)

In [78]:
# Create the collection if it doesn't exist
existing = [c.name for c in client.get_collections().collections]
if COLLECTION not in existing:
    client.recreate_collection(
        collection_name=COLLECTION,
        vectors_config={
            "text":  qmodels.VectorParams(size=int(TXT_DIM), distance="Cosine"),
            "image": qmodels.VectorParams(size=int(IMG_DIM), distance="Cosine"),
        },
    )

index_images(client)
index_text(client)

print("✅ Qdrant Cloud population completed.")

Img:   0%|          | 0/19 [00:00<?, ?it/s]Device set to use cuda:0
Img: 100%|██████████| 19/19 [05:09<00:00, 16.29s/it]
Text: 100%|██████████| 62/62 [06:07<00:00,  5.92s/it]

✅ Qdrant Cloud population completed.





## Streamlit Demo

In [75]:
from pyngrok import ngrok

public_url = ngrok.connect(8501)
print("Streamlit URL:", public_url)

!streamlit run streamlit_app.py \
    --server.port 8501 \
    --server.address 0.0.0.0 \
    --server.headless true \
    --server.fileWatcherType none &