# Data Ingestion Pipeline

This notebook demonstrates how to:
1. Load markdown files from the data directory
2. Chunk the text into manageable pieces
3. Ingest the chunks into Qdrant via the API (which handles embedding generation)

**IMPORTANT**: Make sure the API server is running before using this notebook!
- Run: `make dev` or `docker compose up`

In [1]:
import os
from pathlib import Path
from typing import List, Dict
import httpx

## Configuration

In [2]:
# API Configuration
API_BASE_URL = "http://localhost:8000"
COLLECTION_NAME = "temp"

# Data paths
DATA_DIR = "../../data"

# Chunking settings
CHUNK_SIZE = 500  # characters
CHUNK_OVERLAP = 50  # characters

print(f"API URL: {API_BASE_URL}")
print(f"Collection: {COLLECTION_NAME}")
print(f"Data directory: {DATA_DIR}")

API URL: http://localhost:8000
Collection: temp
Data directory: ../../data


## Verify API Connection

In [3]:
# Create HTTP client
client = httpx.Client(timeout=60.0)

# Check API health
try:
    response = client.get(f"{API_BASE_URL}/v1/health")
    response.raise_for_status()
    health = response.json()
    print(f"✓ API is running")
    print(f"  Version: {health['version']}")
    print(f"  Timestamp: {health['timestamp']}")
except httpx.ConnectError:
    print("✗ Cannot connect to API!")
    print("Make sure the API is running: make dev")
except Exception as e:
    print(f"✗ Error: {e}")

✓ API is running
  Version: 0.1.0
  Timestamp: 1771347119.3295116


## Helper Functions

In [4]:
def load_markdown_files(data_dir: str) -> List[Dict[str, str]]:
    """Load all markdown files from the data directory."""
    documents = []
    data_path = Path(data_dir)
    
    for md_file in data_path.glob("*.md"):
        with open(md_file, "r", encoding="utf-8") as f:
            content = f.read()
            documents.append({
                "filename": md_file.name,
                "filepath": str(md_file.absolute()),
                "content": content
            })
    
    return documents

In [5]:
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
    """
    Split text into overlapping chunks based on character count.
    Uses paragraph-aware chunking for better semantic boundaries.
    
    Args:
        text: Text to chunk
        chunk_size: Maximum characters per chunk
        overlap: Number of overlapping characters between chunks
    
    Returns:
        List of text chunks
    """
    # Split by double newline (paragraphs) first
    paragraphs = text.split("\n\n")
    
    chunks = []
    current_chunk = ""
    
    for para in paragraphs:
        para = para.strip()
        if not para:
            continue
        
        # If adding this paragraph exceeds chunk size and we have content
        if len(current_chunk) + len(para) > chunk_size and current_chunk:
            chunks.append(current_chunk.strip())
            # Start new chunk with overlap
            overlap_start = max(0, len(current_chunk) - overlap)
            current_chunk = current_chunk[overlap_start:] + "\n\n" + para
        else:
            if current_chunk:
                current_chunk += "\n\n" + para
            else:
                current_chunk = para
    
    # Add remaining chunk
    if current_chunk.strip():
        chunks.append(current_chunk.strip())
    
    return chunks if chunks else [text]

## Create Collection

In [6]:
# Create collection (or use existing)
try:
    response = client.post(
        f"{API_BASE_URL}/v1/collections/",
        json={
            "name": COLLECTION_NAME,
            "dimension": 384,  # all-MiniLM-L6-v2 dimension
            "distance_metric": "cosine"
        }
    )
    
    if response.status_code == 201:
        print(f"✓ Created collection '{COLLECTION_NAME}'")
    elif response.status_code == 400 and "already exists" in response.text.lower():
        print(f"✓ Collection '{COLLECTION_NAME}' already exists")
    else:
        print(f"Response: {response.status_code} - {response.text}")
        
except Exception as e:
    print(f"Error: {e}")

✓ Created collection 'temp'


## Load and Inspect Data

In [7]:
# Load markdown files
documents = load_markdown_files(DATA_DIR)
print(f"Loaded {len(documents)} markdown files:")
for doc in documents:
    print(f"  - {doc['filename']} ({len(doc['content'])} characters)")

Loaded 2 markdown files:
  - sample3.md (1896 characters)
  - sample1.md (3297 characters)


In [8]:
# Preview first document
if documents:
    print("\nFirst document preview:")
    print("=" * 80)
    print(documents[0]['content'][:500])
    print("...")
    print("=" * 80)


First document preview:
# FastAPI: Modern Python Web Framework

FastAPI is a modern, high-performance web framework for building APIs with Python. It is based on standard Python type hints and provides automatic data validation, serialization, and interactive API documentation.

## Key Features

- **High Performance**: FastAPI is one of the fastest Python frameworks, comparable to NodeJS and Go, thanks to its use of Starlette for the web parts and Pydantic for data handling.
- **Type Safety**: Leverages Python type hin
...


## Chunk Documents

In [9]:
# Chunk all documents
all_chunks = []

for doc in documents:
    chunks = chunk_text(doc['content'], CHUNK_SIZE, CHUNK_OVERLAP)
    
    for idx, chunk in enumerate(chunks):
        all_chunks.append({
            "text": chunk,
            "filename": doc['filename'],
            "filepath": doc['filepath'],
            "chunk_index": idx,
            "total_chunks": len(chunks)
        })

print(f"Total chunks created: {len(all_chunks)}")
print(f"\nChunks per document:")
for doc in documents:
    doc_chunks = [c for c in all_chunks if c['filename'] == doc['filename']]
    print(f"  - {doc['filename']}: {len(doc_chunks)} chunks")

Total chunks created: 15

Chunks per document:
  - sample3.md: 5 chunks
  - sample1.md: 10 chunks


In [10]:
# Preview a chunk
if all_chunks:
    print("\nSample chunk:")
    print("=" * 80)
    print(f"File: {all_chunks[0]['filename']}")
    print(f"Chunk: {all_chunks[0]['chunk_index'] + 1}/{all_chunks[0]['total_chunks']}")
    print(f"Length: {len(all_chunks[0]['text'])} chars")
    print(f"\nText:\n{all_chunks[0]['text'][:300]}...")
    print("=" * 80)


Sample chunk:
File: sample3.md
Chunk: 1/5
Length: 271 chars

Text:
# FastAPI: Modern Python Web Framework

FastAPI is a modern, high-performance web framework for building APIs with Python. It is based on standard Python type hints and provides automatic data validation, serialization, and interactive API documentation.

## Key Features...


## Prepare Datapoints for API

In [11]:
# Prepare datapoints for bulk insert
# The API will automatically generate embeddings for each text
datapoints = []

for chunk in all_chunks:
    datapoints.append({
        "text": chunk['text'],
        "metadata": {
            "filename": chunk['filename'],
            "filepath": chunk['filepath'],
            "chunk_index": chunk['chunk_index'],
            "total_chunks": chunk['total_chunks']
        }
    })

print(f"Prepared {len(datapoints)} datapoints for ingestion")
print(f"\nSample datapoint:")
print(datapoints[0])

Prepared 15 datapoints for ingestion

Sample datapoint:
{'text': '# FastAPI: Modern Python Web Framework\n\nFastAPI is a modern, high-performance web framework for building APIs with Python. It is based on standard Python type hints and provides automatic data validation, serialization, and interactive API documentation.\n\n## Key Features', 'metadata': {'filename': 'sample3.md', 'filepath': '/Users/steffen/rag_template/backend/notebooks/../../data/sample3.md', 'chunk_index': 0, 'total_chunks': 5}}


## Ingest into Qdrant

In [12]:
# Ingest in batches
BATCH_SIZE = 50
total_inserted = 0

print(f"Ingesting {len(datapoints)} datapoints in batches of {BATCH_SIZE}...\n")

for i in range(0, len(datapoints), BATCH_SIZE):
    batch = datapoints[i:i + BATCH_SIZE]
    batch_num = i // BATCH_SIZE + 1
    total_batches = (len(datapoints) + BATCH_SIZE - 1) // BATCH_SIZE
    
    try:
        response = client.post(
            f"{API_BASE_URL}/v1/collections/{COLLECTION_NAME}/datapoints/bulk",
            json=batch
        )
        response.raise_for_status()
        result = response.json()
        inserted = result.get("inserted_count", len(batch))
        total_inserted += inserted
        print(f"  ✓ Batch {batch_num}/{total_batches}: Inserted {inserted} datapoints")
        
    except httpx.HTTPStatusError as e:
        print(f"  ✗ Batch {batch_num}/{total_batches} failed: {e}")
        print(f"    Response: {e.response.text}")
        continue

print(f"\n✓ Ingestion complete! Total inserted: {total_inserted} datapoints")

Ingesting 15 datapoints in batches of 50...

  ✓ Batch 1/1: Inserted 15 datapoints

✓ Ingestion complete! Total inserted: 15 datapoints


## Test Retrieval

In [13]:
# Test query using the full RAG pipeline
test_query = "What is RAG and how does it work?"
print(f"Test query: {test_query}\n")

try:
    response = client.post(
        f"{API_BASE_URL}/v1/query/",
        json={
            "question": test_query,
            "collection_name": COLLECTION_NAME,
            "n_retrieval": 5,
            "n_ranking": 3
        }
    )
    response.raise_for_status()
    results = response.json()
    
    print(f"Top {len(results['results'])} results:\n")
    for i, result in enumerate(results['results'], 1):
        print(f"{i}. Score: {result['score']:.4f}")
        print(f"   File: {result['metadata'].get('filename', 'N/A')}")
        print(f"   Chunk: {result['metadata'].get('chunk_index', 0) + 1}/{result['metadata'].get('total_chunks', 1)}")
        print(f"   Text: {result['text'][:200]}...")
        print()
        
except Exception as e:
    print(f"Error during query: {e}")

Test query: What is RAG and how does it work?

Top 3 results:

1. Score: 7.6712
   File: sample1.md
   Chunk: 1/10
   Text: # Retrieval-Augmented Generation (RAG)

Retrieval-Augmented Generation (RAG) is an AI framework that enhances large language model (LLM) outputs by incorporating external knowledge retrieval into the ...

2. Score: 3.1827
   File: sample1.md
   Chunk: 4/10
   Text: in the retrieved information.

## Benefits of RAG

- **Reduced hallucinations**: By grounding responses in actual documents, RAG significantly reduces the tendency of LLMs to generate factually incorr...

3. Score: 1.3304
   File: sample1.md
   Chunk: 2/10
   Text: s

The RAG pipeline consists of three main stages:

1. **Indexing**: Documents are preprocessed, split into chunks, and converted into vector embeddings. These embeddings are stored in a vector databa...



## Test Full RAG (with Generation)

In [14]:
# Test the full RAG pipeline (retrieval + generation)
test_question = "What are the benefits of RAG?"
print(f"Question: {test_question}\n")

try:
    response = client.post(
        f"{API_BASE_URL}/v1/rag/",
        json={
            "question": test_question,
            "collection_name": COLLECTION_NAME,
            "n_retrieval": 5,
            "n_ranking": 3
        }
    )
    response.raise_for_status()
    result = response.json()
    
    print("Answer:")
    print("=" * 80)
    print(result['answer'])
    print("=" * 80)
    
except Exception as e:
    print(f"Error during RAG: {e}")

Question: What are the benefits of RAG?

Answer:
[chain_of_thought]
The question is "What are the benefits of RAG?".
The context provides a section titled "Benefits of RAG" that lists the benefits of RAG.
I will combine the question with the information in the context to create an enhanced question.

[Enhanced_Question]
According to the provided information, what are the benefits of Retrieval-Augmented Generation (RAG)?



## Cleanup

In [15]:
# Close HTTP client
client.close()
print("✓ HTTP client closed")

✓ HTTP client closed
