In [1]:
import PyPDF2
from transformers import pipeline
from elasticsearch import Elasticsearch

In [29]:
import re
from typing import List, Dict, Optional

def clean_title(raw: str) -> str:
    """Remove trailing spaces, dots, and page numbers."""
    cleaned = re.sub(r'\s*[\.\s]+[\d\s]*$', '', raw)
    return cleaned.strip()

def extract_chapters(text: str) -> List[Dict]:
    """
    Find all chapter headings (lines like "1 INTRODUCTION ...").
    Returns list of dicts with chapter number, title, and start/end positions.
    """
    chapter_pattern = re.compile(
        r'^(?!.*Table|.*Figure|.*Map)(\d{1,2})\s+([A-Z][A-Z\s,.-]+?)\s*$',
        re.MULTILINE
    )
    chapters = []
    for match in chapter_pattern.finditer(text):
        chapters.append({
            'num': match.group(1),
            'title': clean_title(match.group(2)),
            'start': match.start(),
            'end': match.end()
        })
    return chapters

def chunk_kdhs_report(file_path: str) -> List[Dict]:
    """
    Twoâ€‘level chunking: chapters, then sections within each chapter.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    text = re.sub(r'\r\n', '\n', text)

    # ---- Step 1: Extract chapters ----
    chapters = extract_chapters(text)
    if not chapters:
        raise ValueError("No chapters found.")

    # Add a sentinel end for the last chapter
    for i in range(len(chapters)):
        chap = chapters[i]
        chap['end'] = chapters[i+1]['start'] if i+1 < len(chapters) else len(text)

    # ---- Step 2: Front matter (before first chapter) ----
    chunks = []
    if chapters[0]['start'] > 0:
        front = text[:chapters[0]['start']].strip()
        if front:
            chunks.append({
                'type': 'front_matter',
                'text': front
            })

    # ---- Step 3: Process each chapter ----
    for chap in chapters:
        chapter_text = text[chap['start']:chap['end']].strip()
        chapter_num = chap['num']
        chapter_title = chap['title']

        # Find section headings within this chapter's text
        # Pattern: line starts with digit.digit, space, then any chars, end of line
        section_pattern = re.compile(r'^(\d+\.\d+)\s+(.+?)\s*$', re.MULTILINE)
        sections = list(section_pattern.finditer(chapter_text))

        if not sections:
            # Chapter without sections â€“ keep as one chunk
            chunks.append({
                'type': 'chapter',
                'chapter_num': chapter_num,
                'chapter_title': chapter_title,
                'text': chapter_text
            })
            continue

        # Chapter intro (before first section)
        intro_text = chapter_text[:sections[0].start()].strip()
        if intro_text:
            chunks.append({
                'type': 'chapter_intro',
                'chapter_num': chapter_num,
                'chapter_title': chapter_title,
                'text': intro_text
            })

        # Each section
        for i, sec in enumerate(sections):
            sec_num = sec.group(1)
            raw_title = sec.group(2)
            sec_title = clean_title(raw_title)
            sec_start = sec.start()
            sec_end = sections[i+1].start() if i+1 < len(sections) else len(chapter_text)
            sec_text = chapter_text[sec_start:sec_end].strip()

            chunks.append({
                'type': 'section',
                'chapter_num': chapter_num,
                'chapter_title': chapter_title,
                'section_num': sec_num,
                'section_title': sec_title,
                'text': sec_text
            })

    return chunks


# ---- Example usage ----
if __name__ == "__main__":
    chunks = chunk_kdhs_report("kdhs_2022_extracted.txt")
    print(f"Total chunks: {len(chunks)}")

    # Show first 10 chunks
    for i, c in enumerate(chunks[:10]):
        print(f"\n--- Chunk {i+1} ---")
        if c['type'] == 'front_matter':
            print("Type: Front Matter")
        elif c['type'] == 'chapter':
            print(f"Chapter {c['chapter_num']}: {c['chapter_title']} (whole chapter)")
        elif c['type'] == 'chapter_intro':
            print(f"Chapter {c['chapter_num']}: {c['chapter_title']} (intro)")
        else:  # section
            print(f"Chapter {c['chapter_num']} â€“ Section {c['section_num']}: {c['section_title']}")
        print("Preview:", c['text'][:150].replace('\n', ' '), "...")

Total chunks: 197

--- Chunk 1 ---
Type: Front Matter
Preview: Kenya Demographic and  Health Survey 2022 Demographic and Health Survey  â€¢  Volume 1  Kenya 2022 Volume 1          Kenya   Demographic and Health Surv ...

--- Chunk 2 ---
Chapter 13: KNOWLEDGE, ATTITUDES, AND BEHAVIOUR RELATED TO HIV, AIDS, AND (intro)
Preview: 13 KNOWLEDGE, ATTITUDES, AND BEHAVIOUR RELATED TO HIV, AIDS, AND  TUBERCULOSIS  ................................ ................................ .... ...

--- Chunk 3 ---
Chapter 13 â€“ Section 13.1: Tuberculosis: Knowledge, Diagnosis, and Preventive Treatment
Preview: 13.1 Tuberculosis: Knowledge, Diagnosis, and Preventive Treatment  ................................ ... 422  13.1.1  Knowledge and Beliefs about Tuber ...

--- Chunk 4 ---
Chapter 13 â€“ Section 13.2: Knowledge and Attitudes about Medicines to Treat or Prevent HIV
Preview: 13.2 Knowledge and Attitudes about Medicines to Treat or Prevent HIV  ..............................  422 ...

--- Chunk 5 ---


In [1]:
import re
from typing import List, Dict, Optional

def clean_title(raw: str) -> str:
    """Remove trailing spaces, dots, and page numbers."""
    cleaned = re.sub(r'\s*[\.\s]+[\d\s]*$', '', raw)
    return cleaned.strip()

def extract_chapters(text: str) -> List[Dict]:
    """
    Find all chapter headings.
    Pattern: optional leading spaces, then 1-2 digits, space, then any characters to end of line.
    """
    chapter_pattern = re.compile(
        r'^\s*(\d{1,2})\s+(.+?)\s*$',
        re.MULTILINE
    )
    chapters = []
    for match in chapter_pattern.finditer(text):
        # Basic heuristic: title should be mostly uppercase and reasonably long
        raw_title = match.group(2).strip()
        title = clean_title(raw_title)
        # Skip if title is too short (likely not a real chapter)
        if len(title) < 10:
            continue
        # Skip if title contains common TOC phrases (optional)
        if any(word in title.lower() for word in ['table', 'figure', 'list of']):
            continue
        chapters.append({
            'num': match.group(1),
            'title': title,
            'start': match.start(),
            'end': match.end()
        })
    return chapters

def chunk_kdhs_report(file_path: str) -> List[Dict]:
    """
    Twoâ€‘level chunking: chapters, then sections within each chapter.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    text = re.sub(r'\r\n', '\n', text)

    # ---- Step 1: Extract chapters ----
    chapters = extract_chapters(text)
    if not chapters:
        raise ValueError("No chapters found.")

    # Add a sentinel end for the last chapter
    for i in range(len(chapters)):
        chap = chapters[i]
        chap['end'] = chapters[i+1]['start'] if i+1 < len(chapters) else len(text)

    # ---- Step 2: Front matter (before first chapter) ----
    chunks = []
    if chapters[0]['start'] > 0:
        front = text[:chapters[0]['start']].strip()
        if front:
            chunks.append({
                'type': 'front_matter',
                'text': front
            })

    # ---- Step 3: Process each chapter ----
    for chap in chapters:
        chapter_text = text[chap['start']:chap['end']].strip()
        chapter_num = chap['num']
        chapter_title = chap['title']

        # Find section headings within this chapter's text
        section_pattern = re.compile(r'^\s*(\d+\.\d+)\s+(.+?)\s*$', re.MULTILINE)
        sections = list(section_pattern.finditer(chapter_text))

        if not sections:
            chunks.append({
                'type': 'chapter',
                'chapter_num': chapter_num,
                'chapter_title': chapter_title,
                'text': chapter_text
            })
            continue

        # Chapter intro (before first section)
        intro_text = chapter_text[:sections[0].start()].strip()
        if intro_text:
            chunks.append({
                'type': 'chapter_intro',
                'chapter_num': chapter_num,
                'chapter_title': chapter_title,
                'text': intro_text
            })

        # Each section
        for i, sec in enumerate(sections):
            sec_num = sec.group(1)
            raw_title = sec.group(2)
            sec_title = clean_title(raw_title)
            sec_start = sec.start()
            sec_end = sections[i+1].start() if i+1 < len(sections) else len(chapter_text)
            sec_text = chapter_text[sec_start:sec_end].strip()

            chunks.append({
                'type': 'section',
                'chapter_num': chapter_num,
                'chapter_title': chapter_title,
                'section_num': sec_num,
                'section_title': sec_title,
                'text': sec_text
            })

    return chunks


if __name__ == "__main__":
    chunks = chunk_kdhs_report("kdhs_2022_extracted.txt")
    print(f"Total chunks: {len(chunks)}")

    # Show first 20 chunks
    for i, c in enumerate(chunks[:20]):
        print(f"\n--- Chunk {i+1} ---")
        print(f"Type: {c['type']}")
        if c['type'] != 'front_matter':
            print(f"Chapter {c['chapter_num']}: {c['chapter_title']}")
            if c['type'] == 'section':
                print(f"Section {c['section_num']}: {c['section_title']}")
        print("Preview:", c['text'][:150].replace('\n', ' '), "...")

Total chunks: 1267

--- Chunk 1 ---
Type: front_matter
Preview: Kenya Demographic and  Health Survey 2022 Demographic and Health Survey  â€¢  Volume 1  Kenya 2022 Volume 1          Kenya   Demographic and Health Surv ...

--- Chunk 2 ---
Type: chapter_intro
Chapter 1: INTRODUCTION AND SURVEY METHODOLOGY
Preview: 1 INTRODUCTION AND SURVEY METHODOLOGY  ................................ ..............................  1 ...

--- Chunk 3 ---
Type: section
Chapter 1: INTRODUCTION AND SURVEY METHODOLOGY
Section 1.1: Survey Objectives
Preview: 1.1 Survey Objectives  ................................ ................................ ................................ ................  1 ...

--- Chunk 4 ---
Type: section
Chapter 1: INTRODUCTION AND SURVEY METHODOLOGY
Section 1.2: Sample Design
Preview: 1.2 Sample Design  ................................ ................................ ................................ .....................  1 ...

--- Chunk 5 ---
Type: section
Chapter 1: INTRODUCT

In [2]:
import json
from typing import List, Dict

def chunks_to_jsonl(chunks: List[Dict], output_file: str = "kdhs_chunks.jsonl"):
    """
    Convert chunk dictionaries to JSONL format for RAG indexing.
    Each line becomes a JSON object with consistent fields.
    """
    with open(output_file, 'w', encoding='utf-8') as f_out:
        for i, chunk in enumerate(chunks):
            # Creating a clean document object
            doc = {
                'id': f'chunk_{i:04d}',
                'text': chunk['text'],
                'type': chunk['type']
            }
            
            # Adding chapter info if present
            if 'chapter_num' in chunk:
                doc['chapter_num'] = chunk['chapter_num']
                doc['chapter_title'] = chunk['chapter_title']
            
            # Adding section info if present
            if 'section_num' in chunk:
                doc['section_num'] = chunk['section_num']
                doc['section_title'] = chunk['section_title']
            
            # Adding a source field for filtering (like 'course' in the notebook)
            doc['source'] = 'kdhs_2022'
            
            # Writing as JSON line
            f_out.write(json.dumps(doc) + '\n')
    
    print(f"Saved {len(chunks)} chunks to {output_file}")
    
    # Showing a sample
    with open(output_file, 'r') as f:
        first_line = f.readline().strip()
        print("\nðŸ“„ Sample chunk:")
        print(json.dumps(json.loads(first_line), indent=2))


if __name__ == "__main__":
    # If you have chunks in memory from previous session:
    chunks_to_jsonl(chunks)
    
    # If you need to load from a file:
    # with open('chunks.json', 'r') as f:
    #     chunks = json.load(f)
    # chunks_to_jsonl(chunks)

Saved 1267 chunks to kdhs_chunks.jsonl

ðŸ“„ Sample chunk:
{
  "id": "chunk_0000",
  "text": "Kenya\nDemographic and \nHealth Survey 2022\nDemographic and Health Survey  \u2022  Volume 1\n Kenya 2022\nVolume 1 \n   \n \n \nKenya  \nDemographic and Health Survey  \n2022  \n \nVolume 1  \n \n \n \n \nKenya National Bureau of Statistics  \nNairobi, Kenya  \n \nMinistry of Health  \nNairobi, Kenya  \n \nThe DHS Program  \nICF \nRockville, Maryland, USA  \n \n \nJune  2023 \n \n \n \n \n \n \n \n \n    \n    \n    \n    \n   \n  \n \n The 2022 Kenya Demographic and Health Survey (2022 KDHS) was implemented by the Kenya National Bureau \nof Statistics (KNBS) in collaboration  with the Ministry of Health (MoH)  and other stakeholders . Funding for the \nsurvey was provided by the Government of K enya, the United States Agency for International Development \n(USAID), the Bill & Melinda Gates Foundation, the World Bank, the United Nations Children \u2019s Fund (UNICEF), \nthe United Nations Pop

In [4]:
es_client = Elasticsearch('http://localhost:9200')

# Test connection
if es_client.ping():
    print("Connected to Elasticsearch")
else:
    print("Cannot connect to Elasticsearch")
    print("Make sure Elasticsearch is running on http://localhost:9200")

Cannot connect to Elasticsearch
Make sure Elasticsearch is running on http://localhost:9200
