In [1]:
import json
import os
import re

from docling.document_converter import DocumentConverter
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
# Initialize document converter
converter = DocumentConverter()

# Configuration
BASE_PATH = "/workspace/output/01"  # Change this to your base folder path
# OUTPUT_PATH = "/workspace/output/02"  # Output path for chunks
OUTPUT_PATH = "/workspace/output/"  # Output path for chunks

# Create output directory if it doesn't exist
os.makedirs(OUTPUT_PATH, exist_ok=True)

In [3]:
def load_chapter_structure(folder_path):
    """Load chapter structure from JSON file."""
    json_file_path = os.path.join(folder_path, "chapter_structure.json")
    if os.path.exists(json_file_path):
        with open(json_file_path, "r", encoding="utf-8") as f:
            chapter_structure = json.load(f)
        return chapter_structure
    else:
        print(f"Chapter structure file not found in {folder_path}")
        return None


def convert_pdf_to_markdown(pdf_path):
    """Convert PDF to markdown content."""
    try:
        pdf = converter.convert(pdf_path).document
        return pdf.export_to_markdown()
    except Exception as e:
        print(f"Error converting {pdf_path}: {str(e)}")
        return None


# def split_content_by_headings(content, headings):
#     """Split content into ch unks based on headings."""
#     chunks = {}

#     # Find all heading positions
#     heading_positions = []
#     for heading in headings:
#         match = re.search(re.escape(heading), content, re.IGNORECASE | re.MULTILINE)
#         if match:
#             heading_positions.append((heading, match.start()))

#     # Sort by position
#     heading_positions.sort(key=lambda x: x[1])

#     # Create chunks
#     for i, (heading, start_pos) in enumerate(heading_positions):
#         # Determine end position
#         if i + 1 < len(heading_positions):
#             end_pos = heading_positions[i + 1][1]
#         else:
#             end_pos = len(content)

#         # Extract chunk
#         chunk_content = content[start_pos:end_pos].strip()
#         chunks[heading] = chunk_content

#     return chunks


def split_content_by_headings(content, headings):
    """Split content into chunks based on markdown headings (## ...)."""
    chunks = {}

    # Find all heading positions
    heading_positions = []
    for heading in headings:
        # Match only if heading starts with ##
        pattern = r"^##\s*" + re.escape(heading) + r"\s*$"
        match = re.search(pattern, content, re.IGNORECASE | re.MULTILINE)
        if match:
            heading_positions.append((heading, match.start()))

    # Sort by position
    heading_positions.sort(key=lambda x: x[1])

    # Create chunks
    for i, (heading, start_pos) in enumerate(heading_positions):
        if i + 1 < len(heading_positions):
            end_pos = heading_positions[i + 1][1]
        else:
            end_pos = len(content)

        chunk_content = content[start_pos:end_pos].strip()
        chunks[heading] = chunk_content

    return chunks


def save_chunks(
    chunks, output_folder, chapter_num, full_chapter_content=None, generate_summary=True
):
    """Save chunks to separate files and optionally generate chapter summary."""
    chapter_output_folder = os.path.join(output_folder, f"chapter_{chapter_num}")
    os.makedirs(chapter_output_folder, exist_ok=True)

    chunk_info = {}

    # Save individual chunks
    for i, (heading, content) in enumerate(chunks.items(), 1):
        # Create safe filename
        safe_filename = re.sub(r"[^\w\s-]", "", heading)
        safe_filename = re.sub(r"[-\s]+", "_", safe_filename)
        filename = f"chunk_{i:02d}_{safe_filename[:50]}.md"

        # Save chunk
        chunk_path = os.path.join(chapter_output_folder, filename)
        with open(chunk_path, "w", encoding="utf-8") as f:
            f.write(f"# {heading}\n\n")
            f.write(content)

        chunk_info[heading] = {
            "filename": filename,
            "word_count": len(content.split()),
            "char_count": len(content),
        }

    # Save chunk info
    info_path = os.path.join(chapter_output_folder, "chunk_info.json")
    with open(info_path, "w", encoding="utf-8") as f:
        json.dump(chunk_info, f, indent=2, ensure_ascii=False)

    return chunk_info

In [4]:
def process_folder(folder_path, generate_summaries=True):
    """Process a single folder containing PDFs and chapter structure."""
    folder_name = os.path.basename(folder_path)
    print(f"\n=== Processing folder: {folder_name} ===")

    # Load chapter structure
    chapter_structure = load_chapter_structure(folder_path)
    if not chapter_structure:
        return None

    results = {}

    # Process each chapter
    for chapter_num, headings in chapter_structure.items():
        pdf_path = os.path.join(folder_path, f"{chapter_num}.pdf")

        if not os.path.exists(pdf_path):
            print(f"PDF not found: {pdf_path}")
            continue

        print(f"\nProcessing Chapter {chapter_num}...")
        print(f"Headings to extract: {len(headings)}")

        # Convert PDF to markdown
        content = convert_pdf_to_markdown(pdf_path)
        if not content:
            print(f"Failed to convert PDF: {pdf_path}")
            continue
        print(f"Total content length: {len(content)} characters")

        # Split into chunks
        chunks = split_content_by_headings(content, headings)
        print(f"Successfully extracted {len(chunks)} chunks")

        if len(chunks) == 0:
            print(
                f"No chunks found for Chapter {chapter_num}. Check if headings match the PDF content."
            )
            continue

        # Save chunks with optional summary
        output_folder = os.path.join(OUTPUT_PATH, folder_name)
        chunk_info = save_chunks(
            chunks, output_folder, chapter_num, content, generate_summaries
        )

        # Save Markdown content
        markdown_folder = os.path.join(
            OUTPUT_PATH, folder_name, f"chapter_{chapter_num}"
        )
        markdown_file = os.path.join(markdown_folder, f"chapter_{chapter_num}.md")
        with open(markdown_file, "w", encoding="utf-8") as f:
            f.write(content)

        # Track results
        has_summary = "chapter_summary" in chunk_info and "error" not in chunk_info.get(
            "chapter_summary", {}
        )
        results[chapter_num] = {
            "total_chunks": len(chunks),
            "chunk_info": chunk_info,
            "has_summary": has_summary,
        }

        # Print chunk statistics
        if has_summary:
            summary_word_count = chunk_info["chapter_summary"].get("word_count", 0)
            print(f"    Chapter summary: {summary_word_count} words")

        for heading, info in chunk_info.items():
            if heading != "chapter_summary":
                print(
                    f"  - {heading[:60]}{'...' if len(heading) > 60 else ''}: {info['word_count']} words"
                )

    return results


def process_all_folders(base_path, generate_summaries=True):
    """Process all folders in the base path."""
    if not os.path.exists(base_path):
        print(f"Base path does not exist: {base_path}")
        return {}

    all_results = {}
    folders = [
        item
        for item in os.listdir(base_path)
        if os.path.isdir(os.path.join(base_path, item))
    ]

    if not folders:
        print(f"No folders found in: {base_path}")
        return {}

    print(f"Found {len(folders)} folders to process: {folders}")

    for item in folders:
        folder_path = os.path.join(base_path, item)
        try:
            results = process_folder(folder_path, generate_summaries)
            if results:
                all_results[item] = results
        except Exception as e:
            print(f"Error processing folder {item}: {str(e)}")

    return all_results

In [5]:
# Example: Process a single folder
single_folder_name = "ES_24-25"  # Change this to your folder name
single_folder_path = os.path.join(BASE_PATH, single_folder_name)

if os.path.exists(single_folder_path):
    print(f"Processing single folder: {single_folder_name}")
    results = process_folder(single_folder_path, generate_summaries=True)
    if results:
        print("\n=== Processing Complete ===")
        print(f"Results saved to: {OUTPUT_PATH}")
        print(f"Processed {len(results)} chapters")
    else:
        print("No results generated - check your folder structure and files")
else:
    print(f"Folder not found: {single_folder_path}")
    print(f"Available folders in {BASE_PATH}:")
    if os.path.exists(BASE_PATH):
        available_folders = [
            f
            for f in os.listdir(BASE_PATH)
            if os.path.isdir(os.path.join(BASE_PATH, f))
        ]
        for folder in available_folders:
            print(f"  - {folder}")
    else:
        print(f"Base path does not exist: {BASE_PATH}")

Processing single folder: ES_24-25

=== Processing folder: ES_24-25 ===

Processing Chapter 1...
Headings to extract: 6
Total content length: 55405 characters
Successfully extracted 6 chunks
  - STATE OF THE ECONOMY: GETTING BACK INTO THE FAST LANE: 350 words
  - Introduction: 137 words
  - Global Economic Scenario: 1965 words
  - Domestic Economy Remains Steady Amidst Global Uncertainties: 2444 words
  - Economy Characterised by Stability and Inclusivity on Multip...: 2873 words
  - Outlook and Way Forward: 423 words

Processing Chapter 2...
Headings to extract: 6
Total content length: 123656 characters
Successfully extracted 5 chunks
  - Introduction: 244 words
  - Monetary Developments: 701 words
  - Financial Intermediation: 15015 words
  - Risks Pertaining to India's Financial Sector: 1246 words
  - Outlook: 604 words

Processing Chapter 3...
Headings to extract: 7
Total content length: 93861 characters
Successfully extracted 7 chunks
  - EXTERNAL SECTOR: GETTING FDI RIGHT: 301 wo

In [6]:
# # Process all folders in the base path
# print("Processing all folders...")
# all_results = process_all_folders(BASE_PATH, generate_summaries=True)

# if all_results:
#     # Save overall summary
#     summary_path = os.path.join(OUTPUT_PATH, "processing_summary.json")
#     with open(summary_path, "w", encoding="utf-8") as f:
#         json.dump(all_results, f, indent=2, ensure_ascii=False)

#     print("\n=== All Processing Complete ===")
#     print(f"Summary saved to: {summary_path}")
#     print(f"Processed {len(all_results)} folders")
# else:
#     print("No folders were successfully processed")

In [7]:
# # Inspect the results (only run if all_results exists)
# if "all_results" in locals() and all_results:
#     print("\n=== Processing Summary ===")
#     for folder_name, folder_results in all_results.items():
#         print(f"\nFolder: {folder_name}")
#         total_chunks = sum(
#             chapter_data["total_chunks"] for chapter_data in folder_results.values()
#         )
#         total_summaries = sum(
#             1
#             for chapter_data in folder_results.values()
#             if chapter_data.get("has_summary", False)
#         )
#         print(f"  Total chapters processed: {len(folder_results)}")
#         print(f"  Total chunks created: {total_chunks}")
#         print(f"  Chapter summaries generated: {total_summaries}")

#         for chapter_num, chapter_data in folder_results.items():
#             summary_status = "✓" if chapter_data.get("has_summary", False) else "✗"
#             print(
#                 f"    Chapter {chapter_num}: {chapter_data['total_chunks']} chunks, summary {summary_status}"
#             )
# else:
#     print("No results to inspect. Run the processing cells first.")

In [8]:
# # Optional: Extract and consolidate all chapter summaries
# def extract_all_summaries(base_output_path):
#     """Extract all chapter summaries from processed folders."""
#     if not os.path.exists(base_output_path):
#         print(f"Output path does not exist: {base_output_path}")
#         return {}

#     all_summaries = {}

#     for folder_name in os.listdir(base_output_path):
#         folder_path = os.path.join(base_output_path, folder_name)
#         if not os.path.isdir(folder_path):
#             continue

#         folder_summaries = {}

#         for item in os.listdir(folder_path):
#             if item.startswith("chapter_") and os.path.isdir(
#                 os.path.join(folder_path, item)
#             ):
#                 chapter_path = os.path.join(folder_path, item)
#                 info_file = os.path.join(chapter_path, "chunk_info.json")

#                 if os.path.exists(info_file):
#                     try:
#                         with open(info_file, "r", encoding="utf-8") as f:
#                             chunk_info = json.load(f)

#                         if (
#                             "chapter_summary" in chunk_info
#                             and "summary_text" in chunk_info["chapter_summary"]
#                         ):
#                             chapter_num = item.replace("chapter_", "")
#                             folder_summaries[chapter_num] = {
#                                 "summary": chunk_info["chapter_summary"][
#                                     "summary_text"
#                                 ],
#                                 "word_count": chunk_info["chapter_summary"].get(
#                                     "word_count", 0
#                                 ),
#                             }
#                     except Exception as e:
#                         print(f"Error reading {info_file}: {str(e)}")

#         if folder_summaries:
#             all_summaries[folder_name] = folder_summaries

#     return all_summaries


# # Extract summaries and save to a consolidated file
# summaries = extract_all_summaries(OUTPUT_PATH)

# if summaries:
#     # Save consolidated summaries
#     summaries_path = os.path.join(OUTPUT_PATH, "all_chapter_summaries.json")
#     with open(summaries_path, "w", encoding="utf-8") as f:
#         json.dump(summaries, f, indent=2, ensure_ascii=False)

#     print(f"\n=== Chapter Summaries Consolidated ===")
#     print(f"Summaries saved to: {summaries_path}")

#     # Display summary statistics
#     for folder_name, folder_summaries in summaries.items():
#         total_summary_words = sum(
#             summary["word_count"] for summary in folder_summaries.values()
#         )
#         print(
#             f"  {folder_name}: {len(folder_summaries)} chapters, {total_summary_words} total summary words"
#         )
# else:
#     print("No summaries found to consolidate. Process some chapters first.")