In [26]:
import os
import json
import time
import hashlib
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional, Callable, Any
import numpy as np
from sentence_transformers import SentenceTransformer

In [27]:
# document.py
class Document:
    def __init__(self, pageContent: str, metadata: dict, id: str):
        self.pageContent = pageContent  # raw text from pdf
        self.metadata: dict = metadata  # pdf metadata (author, date ...)
        self.id = id  # pdf id

In [56]:
# Configs
THIS = Path.cwd()
STORAGE_DIR = THIS.joinpath("embedding")
MODEL_PATH = THIS.joinpath("/models", "Q4_K_M.gguf")

In [55]:
class EmbeddingAdapter:
    """Abstract embedding adapter"""

    def embed_texts(self, text: List[str]) -> List[np.ndarray]:
        raise NotImplementedError("Must be implemented by sub-class")
    
class DummyAdapter(EmbeddingAdapter):
    """Deterministic simulated embeddings for testing"""
    def __init__(self, dim: int = 384):
        self.dim = dim

    def embed_texts(self, texts: List[str]) -> List[np.ndarray]:
        out = []
        for text in texts:

            h = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
            seed = int(h, 16) % (2**31 - 1)
            rng = np.random.RandomState(seed=seed)
            vec = rng.normal(size=(self.dim,))
            norm = np.linalg.norm(vec)
            if norm > 0:
                vec = vec / norm
            out.append(vec.astype(float))
        return out
    
class SentenceTransformerAdapter(EmbeddingAdapter):
    """Adapter for using sentence transformer"""
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        print(f"Loading SentenceTransformer model: {model_name}")
        self.model = SentenceTransformer(model_name)
        print(f"✓ Model loaded successfully")

    def embed_texts(self, texts: List[str]) -> List[np.ndarray]:
        arr = self.model.encode(texts, show_progress_bar=False)
        if isinstance(arr, np.ndarray) and arr.ndim == 2:
            return [arr[i].astype(float) for i in range(arr.shape[0])]
        return [np.asarray(a, dtype=float) for a in arr]
    
class LlamaCppAdapter(EmbeddingAdapter):
    """Adapter for using Llama_cpp_python"""

    def __init__(self, model_name: str = "Q4_K_M", model_path = MODEL_PATH):
        NotImplementedError
            

In [30]:
# Utils
def stable_id_from_text(text: str, prefix: str = "doc"):
    h = hashlib.sha256(text.encode("utf-8")).hexdigest()
    return f"{prefix}_{h[:16]}"

def chunked(iterable: List[Any], batch_size: int):
    for i in range(0, len(iterable), batch_size):
        yield iterable[i: i + batch_size]

def ensure_storage_dir():
    STORAGE_DIR.mkdir(parents=True, exist_ok=True)

In [31]:
# Init the embedding model

def initializeEmbeddingModel(use_dummy: bool = True, **kwargs) -> EmbeddingAdapter:
    if use_dummy:
        return DummyAdapter(**kwargs)
    else:
        return SentenceTransformerAdapter(model_name="all-MiniLM-L6-v2")

In [32]:
def generate_embeddings(adapter: EmbeddingAdapter, documents: List[Document],
                        batch_size: int = 8,
                        on_progress: Optional[Callable[[int, int], None]] = None) -> List[Dict[str, Any]]:
    
    ensure_storage_dir()
    records: List[Dict[str, Any]] = []
    processed = 0
    total = len(documents)

    for batch in chunked(documents, batch_size):
        texts = [doc.pageContent for doc in batch]
        vectors = adapter.embed_texts(texts)

        for doc, vec in zip(batch, vectors):
            rec_id = doc.id if getattr(doc, "id", None) else stable_id_from_text(doc.pageContent)
            embedding_list = list(map(float, np.asarray(vec).tolist()))
            record = {
                "id": rec_id,
                "content": doc.pageContent,
                "metadata": doc.metadata,
                "embedding": embedding_list,
                "timestamp": int(time.time() * 1000)
            }
            records.append(record)

            processed += 1
            
            if on_progress:
                on_progress(processed, total)

    return records

In [60]:
def save_embeddings_json(embeddings: List[Dict[str, Any]], filename: str = "embeddings_json") -> Dict[str, Any]:
    """Save embeddings to STORAGE_DIR/embeddings_json"""

    ensure_storage_dir()
    filepath = STORAGE_DIR.joinpath(filename)

    data = {
        "version": "1.0",
        "model": "sentence-transformer" if isinstance(embeddings, list) and len(embeddings) > 0 else "unknown",
        "dimensions": len(embeddings[0]["embedding"]) if embeddings and len(embeddings) > 0 else None,
        "count": len(embeddings),
        "created": datetime.now().isoformat() + "Z",
        "embeddings": embeddings
    }

    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2)

    size = filepath.stat().st_size
    return {"filepath": str(filepath), "size": size}

In [61]:
def load_embeddings_json(filename: str) -> List[Dict[str, Any]]:
    """Load embeddings JSON and return the list at data["embeddings]"""
    filepath = STORAGE_DIR.joinpath(filename)
    with open(filepath, "r", encoding="utf-8") as f:
        data = json.load(f)

    return data["embeddings"]

In [62]:
def load_existing_embeddings(filename: str) -> Dict[str, Dict[str, Any]]:
    try:
        embeddings = load_embeddings_json(filename)
        emb_map = {item["id"]: item for item in embeddings}
        return emb_map
    except FileNotFoundError:
        return {}

In [63]:
def incremental_embedding(
        adapter: EmbeddingAdapter,
        new_documents: List[Document],
        existing_filename: str,
        batch_size: int = 8
) -> List[Dict[str, Any]]:
    """Load exisiting embeddings by (filename), embed only those file whose id is missing"""
    existing_map = load_existing_embeddings(existing_filename)

    docs_to_embed: List[Document] = []
    for doc in new_documents:
        candidate_id = doc.id if getattr(doc, "id", None) else doc.pageContent[:50]
        if candidate_id not in existing_map:
            docs_to_embed.append(Document(doc.pageContent, doc.metadata, doc.id))

    print(f"Existing embeddings: {len(existing_map)}")
    print(f"New documents to embed: {len(docs_to_embed)}")
    print(f"Skipped (already embedded): {len(new_documents) - len(docs_to_embed)}")

    if len(docs_to_embed) == 0:
        print("All documents already embedded!")
        # return array of existing records
        return list(existing_map.values())
    
    new_embeddings = generate_embeddings(
        adapter, docs_to_embed, batch_size, on_progress=lambda cur, tot: print(f"\rEmbedding: {cur}/{tot}", end="", flush=True)
    )
    print()

    merged = list(existing_map.values()) + new_embeddings
    return merged

In [64]:
def create_sample_documents(count: int = 50) -> List[Document]:
    topics = [
        "artificial intelligence", "machine learning", "natural language processing",
        "computer vision", "data science", "cloud computing", "cybersecurity",
        "blockchain", "quantum computing", "robotics", "virtual reality"
    ]
    templates = [
        lambda topic: f"Recent advances in {topic} have revolutionized the technology industry.",
        lambda topic: f"Understanding {topic} is crucial for modern software development.",
        lambda topic: f"The future of {topic} looks promising with new innovations.",
        lambda topic: f"Companies are investing heavily in {topic} research and development.",
        lambda topic: f"{topic} applications are transforming how we work and live."
    ]

    docs: List[Document] = []
    for i in range(count):
        topic = topics[i % len(topics)]
        template = templates[i % len(templates)]
        text = template(topic)
        docs.append(Document(text, {"id": f"doc_{i}", "topic": topic, "source": "sample_generator", "index": i}, id=f"doc_{i}"))
    return docs

In [65]:
# ---------------------------------------------------------------------
# Example flows: example_1 .. example_6 (similar to JS, but adapted)
# - These are provided so you can run them later for testing.
# - They call the helper functions above.
# ---------------------------------------------------------------------
def example_1(adapter: EmbeddingAdapter):
    """Batch Embedding Generation"""
    print("Loading embedding model (adapter)...")
    docs = create_sample_documents(100)
    print(f"Total Documents: {len(docs)}")
    print("\nGenerating embeddings:")
    start = time.time()
    embeddings = generate_embeddings(adapter, docs, batch_size=16, on_progress=lambda c, t: print(f"\rProgress: {c}/{t}", end="", flush=True))
    duration = time.time() - start
    print(f"\n✓ Completed in {duration:.2f}s")
    avg_ms = (duration / len(docs)) * 1000
    print(f"Total Embeddings: {len(embeddings)}, Average Time: {avg_ms:.2f}ms per document, Throughput: {len(docs)/duration:.2f} docs/sec")
    return embeddings

def example_2(adapter: EmbeddingAdapter):
    """Save and Load Embeddings"""
    docs = create_sample_documents(50)
    print("\nPhase 1: Generate and Save")
    start = time.time()
    embeddings = generate_embeddings(adapter, docs, batch_size=8)
    gen_time = (time.time() - start)
    save_info = save_embeddings_json(embeddings, "embeddings.json")
    print(f"✓ Generated {len(embeddings)} embeddings in {gen_time:.2f}s")
    print(f"✓ Saved to {save_info['filepath']} (size: {save_info['size'] / 1024 / 1024:.2f} MB)")

    print("\nPhase 2: Load from Disk")
    start_load = time.time()
    loaded = load_embeddings_json("embeddings.json")
    load_time_ms = (time.time() - start_load) * 1000
    print(f"✓ Loaded {len(loaded)} embeddings in {load_time_ms:.0f}ms")
    return loaded

def example_3(adapter: EmbeddingAdapter):
    """Incremental Updates"""
    print("\nPhase 1: Initial Batch")
    initial_docs = create_sample_documents(30)
    initial_embeddings = generate_embeddings(adapter, initial_docs, batch_size=8)
    save_embeddings_json(initial_embeddings, "incremental.json")
    print(f"✓ Generated and saved {len(initial_embeddings)} embeddings")

    print("\nPhase 2: Add New Documents")
    new_docs = [Document(d.pageContent, d.metadata, id=f"doc_{30+i}") for i, d in enumerate(create_sample_documents(20))]
    duplicate_docs = create_sample_documents(5)  # duplicates: doc_0..doc_4
    all_docs = new_docs + duplicate_docs

    print(f"\nAttempting to process {len(all_docs)} documents:")
    print(f"  - {len(new_docs)} new documents")
    print(f"  - {len(duplicate_docs)} duplicates (should skip)")

    start_update = time.time()
    updated = incremental_embedding(adapter, all_docs, "incremental.json", batch_size=8)
    update_time = time.time() - start_update

    save_embeddings_json(updated, "incremental.json")
    print("\nResults:")
    print(f" Total Embeddings: {len(updated)}")
    print(f" Update Time: {update_time:.2f}s")
    print(f" New Embeddings: {len(new_docs)}")
    print(f" Skipped: {len(duplicate_docs)}")
    return updated

def example_4(adapter: EmbeddingAdapter):
    """Storage Format Comparison (JSON only implemented here)"""
    docs = create_sample_documents(100)
    print("\nGenerating embeddings for format comparison...")
    embeddings = generate_embeddings(adapter, docs, batch_size=16)
    print("\nSaving formats:")
    json_info = save_embeddings_json(embeddings, "comparison.json")
    print(f"✓ JSON: {json_info['size'] / 1024 / 1024:.2f} MB")

    print("\nLoad Time Comparison:")
    start_json = time.time()
    _ = load_embeddings_json("comparison.json")
    json_load_time_ms = (time.time() - start_json) * 1000
    print(f"JSON: {json_load_time_ms:.0f}ms")
    return json_info


def example_5(adapter: EmbeddingAdapter):
    """Preparing for Vector Stores (formatting)"""
    print("\nSimulating real document processing (chunks)...")
    mock_chunks = [
        Document("Introduction to machine learning and its applications in modern technology.", {"source": "ml_guide.pdf", "page": 1, "chunk": 0, "totalChunks": 3}, id="pdf_1_chunk_0"),
        Document("Supervised learning algorithms include decision trees and neural networks.", {"source": "ml_guide.pdf", "page": 1, "chunk": 1, "totalChunks": 3}, id="pdf_1_chunk_1"),
        Document("Deep learning has revolutionized computer vision and natural language processing.", {"source": "ml_guide.pdf", "page": 2, "chunk": 2, "totalChunks": 3}, id="pdf_1_chunk_2")
    ]
    embeddings = generate_embeddings(adapter, mock_chunks, batch_size=3)
    # vector store format
    vector_store_format = [
        {
            "id": e["id"],
            "vector": e["embedding"],
            "metadata": {
                "content": e["content"],
                "source": e["metadata"].get("source"),
                "page": e["metadata"].get("page"),
                "chunk": e["metadata"].get("chunk"),
                "timestamp": e["timestamp"]
            }
        }
        for e in embeddings
    ]
    save_embeddings_json(vector_store_format, "vector_store_ready.json")
    print("Vector Store Format Example (first item):")
    print(json.dumps(vector_store_format[0], indent=2)[:400] + "...")
    return vector_store_format

def example_6(adapter: EmbeddingAdapter):
    """Real-world PDF processing flow
    (In this environment we do not download a PDF; this function demonstrates the flow only.)
    """
    print("\nReal-World PDF Processing Workflow (demo):")
    print("This example requires local PDFs or internet access to download a PDF.")
    # Example flow (pseudocode):
    # 1) Load PDF with a PDF loader
    # 2) Split into chunks with a text splitter
    # 3) Generate embeddings for each chunk
    # 4) Save to disk
    print("Use your own PDF loader and RecursiveCharacterTextSplitter equivalent in Python.")
    return None

In [66]:
def run_all_examples():
    print("\n" + "=" * 80)
    print("RAG from Scratch - Generate Embeddings (Python)")
    print("=" * 80 + "\n")

    print("Prerequisites:")
    print(" • Model adapter (dummy by default)")
    print(" • Python environment with numpy installed\n")

    adapter = initializeEmbeddingModel(use_dummy=False, dim=384)

    try:
        print("Example 1: Batch Embedding Generation")
        example_1(adapter)

        print("\nExample 2: Save and Load Embeddings")
        example_2(adapter)

        print("\nExample 3: Incremental Updates")
        example_3(adapter)

        print("\nExample 4: Storage Format Comparison")
        example_4(adapter)

        print("\nExample 5: Preparing for Vector Stores")
        example_5(adapter)

        print("\nExample 6: Real-World PDF Processing (demo)")
        example_6(adapter)

        print("\n✅ All examples completed successfully!")
        print("Key Takeaways:")
        print(" • Generate embeddings in batches for efficiency")
        print(" • Always save embeddings to avoid re-computation")
        print(" • Use incremental updates for new documents")
        print(" • Structure data properly for vector stores\n")
    except Exception as e:
        print("\n❌ Error:", str(e))
        print("Make sure your environment has numpy and optional adapters if used.")
        print("If using a real model, ensure MODEL_PATH exists and your adapter is implemented.")

In [67]:
run_all_examples()


RAG from Scratch - Generate Embeddings (Python)

Prerequisites:
 • Model adapter (dummy by default)
 • Python environment with numpy installed

Loading SentenceTransformer model: all-MiniLM-L6-v2
✓ Model loaded successfully
Example 1: Batch Embedding Generation
Loading embedding model (adapter)...
Total Documents: 100

Generating embeddings:
Progress: 100/100
✓ Completed in 0.34s
Total Embeddings: 100, Average Time: 3.39ms per document, Throughput: 295.20 docs/sec

Example 2: Save and Load Embeddings

Phase 1: Generate and Save
✓ Generated 50 embeddings in 0.15s
✓ Saved to c:\Projects\rag-from-scratch\embedding\embeddings.json (size: 0.58 MB)

Phase 2: Load from Disk
✓ Loaded 50 embeddings in 20ms

Example 3: Incremental Updates

Phase 1: Initial Batch
✓ Generated and saved 30 embeddings

Phase 2: Add New Documents

Attempting to process 25 documents:
  - 20 new documents
  - 5 duplicates (should skip)
Existing embeddings: 30
New documents to embed: 20
Skipped (already embedded): 5
Em