In [1]:
# PDF Processing and OCR Implementation

import io
import os
import fitz  # PyMuPDF
import pytesseract
from PIL import Image
import numpy as np
import glob
import asyncio
from pathlib import Path
from typing import List, Dict, Tuple, Any
import tempfile

# For vector store
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import FAISS
from pydantic import SecretStr

print("All libraries imported successfully!")

























All libraries imported successfully!


In [2]:
## Cell 3: Configuration Setup

# Configuration
GOOGLE_API_KEY = "add key here"  # Replace with your actual API key
SESSION_ID = "test_session_001"
OUTPUT_DIR = f"pdf_output/{SESSION_ID}"
VECTOR_STORE_DIR = f"faiss_index/{SESSION_ID}"

# Create directories
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs("faiss_index", exist_ok=True)

print(f"Output directory: {OUTPUT_DIR}")
print(f"Vector store directory: {VECTOR_STORE_DIR}")

Output directory: pdf_output/test_session_001
Vector store directory: faiss_index/test_session_001


In [3]:
## Cell 4: PDF Processing Functions

async def extract_text_from_pdfs(files, session_id=None):
    """
    First processes uploaded PDFs using PyMuPDF to generate text and image files in the output directory.
    Then processes all .txt and .png files in the output_dir.
    It reads text from .txt files and processes images from .png files (e.g., OCR).
    Returns all text concatenated, and a list of processed file names.
    """
    text = ""
    pdf_names = []
    output_dir = f"pdf_output/{session_id or 'default'}"
    os.makedirs(output_dir, exist_ok=True)
    ocr_text = ""

    # First, process uploaded PDF files to generate text and image files
    for file in files:
        pdf_names.append(file.filename)
        contents = await file.read()
        doc = fitz.open(stream=contents, filetype="pdf")
        
        for page_num, page in enumerate(doc.pages()):
            # --- Extract selectable text ---
            page_text = page.get_text()
            text_file_path = os.path.join(output_dir, f"{file.filename}_page{page_num+1}_text.txt")
            
            if page_text.strip():
                with open(text_file_path, "w", encoding="utf-8") as f:
                    f.write(page_text)
                
            # --- Extract images from the page ---
            img_list = page.get_images(full=True)
            for img_num, img in enumerate(img_list):
                xref = img[0]
                base_image = doc.extract_image(xref)
                img_bytes = base_image["image"]
                ext = base_image["ext"]
                img_path = os.path.join(output_dir, f"{file.filename}_page{page_num+1}_img{img_num+1}.{ext}")
                with open(img_path, "wb") as img_file:
                    img_file.write(img_bytes)
        
        doc.close()

    # Now process all .txt files in the output_dir
    txt_files = glob.glob(os.path.join(output_dir, "*.txt"))
    for txt_file in txt_files:
        with open(txt_file, "r", encoding="utf-8") as f:
            file_text = f.read()
            text += file_text + "\n"

    # Process all .jpeg files in the output_dir (OCR)
    jpeg_files = glob.glob(os.path.join(output_dir, "*.jpeg"))
    ocr_full_text = ""
    for jpeg_file in jpeg_files:
        img = Image.open(jpeg_file)
        # Convert to grayscale if needed
        gray_img = img.convert('L')
        ocr_result = pytesseract.image_to_string(gray_img)
        text += ocr_result + "\n"
        ocr_full_text += f"File: {os.path.basename(jpeg_file)}\n{ocr_result}\n\n"

    # Write all OCR text to a single file in the output directory
    ocr_full_text_path = os.path.join(output_dir, "ocr_full_text.txt")
    with open(ocr_full_text_path, "w", encoding="utf-8") as f:
        f.write(ocr_full_text)

    return ocr_text, text, pdf_names

print("PDF processing functions defined successfully!")


PDF processing functions defined successfully!


In [None]:

## Cell 5: Vector Store Utility Functions

def chunk_text_with_metadata(source_texts):
    """
    Accepts a dict of {source_id: text} and returns (chunks, metadatas)
    """
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
    all_chunks = []
    all_metadatas = []
    for source_id, text in source_texts:
        # Optionally, parse source_id for file/page info
        chunks = text_splitter.split_text(text)
        all_chunks.extend(chunks)
        # Attach metadata to each chunk
        for i, chunk in enumerate(chunks):
            print(f"source={source_id}, chunk_index={i}, chunk_length={len(chunk)}")
            meta = {"source": source_id, "chunk_index": i}
            all_metadatas.append(meta)
    return all_chunks, all_metadatas

def chunk_text(text):
    """
    Backward compatible chunk_text
    """
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=200)
    chunks = text_splitter.split_text(text)
    print(chunks)
    return chunks

def create_vector_store(chunks, session_id, metadatas=None):
    """
    Create and save a FAISS vector store
    """
    embedding_model = GoogleGenerativeAIEmbeddings(
        model="models/gemini-embedding-001", 
        google_api_key=SecretStr(GOOGLE_API_KEY)
    )
    if metadatas is not None:
        vector_store = FAISS.from_texts(chunks, embedding=embedding_model, metadatas=metadatas)
    else:
        vector_store = FAISS.from_texts(chunks, embedding=embedding_model)
    
    save_path = f"faiss_index/{session_id}"
    vector_store.save_local(save_path)
    print(f"Vector store saved to: {save_path}")
    return vector_store

def load_vector_store(session_id):
    """
    Load an existing FAISS vector store
    """
    embedding_model = GoogleGenerativeAIEmbeddings(
        model="models/gemini-embedding-001", 
        google_api_key=SecretStr(GOOGLE_API_KEY)
    )
    path = f"faiss_index/{session_id}"
    if not os.path.exists(path):
        raise Exception(f"No vector store found for session: {session_id}")
    return FAISS.load_local(path, embedding_model, allow_dangerous_deserialization=True)

print("Vector store utility functions defined successfully!")

Vector store utility functions defined successfully!


In [5]:
## Cell 6: Mock File Class for Testing

class MockFile:
    """Mock file class to simulate uploaded files for testing"""
    def __init__(self, filename, content):
        self.filename = filename
        self.content = content
    
    async def read(self):
        return self.content

def create_sample_pdf():
    """Create a simple PDF for testing purposes"""
    import fitz
    
    # Create a new PDF document
    pdf_path = "Trademarks.pdf"
    doc = fitz.open(pdf_path)
    
    # Add some text
    text = """
    This is a sample PDF document for testing.
    
    It contains multiple lines of text to demonstrate
    the PDF processing capabilities.
    
    This text should be extractable using PyMuPDF.
    """
    

    
    # Save to bytes
    pdf_bytes = doc.tobytes()
    doc.close()
    
    return pdf_bytes

# Create sample PDF
sample_pdf_bytes = create_sample_pdf()
print(f"Sample PDF created with {len(sample_pdf_bytes)} bytes")

Sample PDF created with 4057365 bytes


In [6]:
## Cell 7: Test PDF Processing

async def test_pdf_processing():
    """Test the PDF processing functionality"""
    print("Testing PDF processing...")
    
    # Create mock file
    mock_file = MockFile("Trademarks.pdf", sample_pdf_bytes)
    files = [mock_file]
    
    try:
        # Process the PDF
        ocr_text, extracted_text, pdf_names = await extract_text_from_pdfs(files, SESSION_ID)
        
        print(f"Processed PDFs: {pdf_names}")
        print(f"Extracted text length: {len(extracted_text)}")
        print(f"OCR text length: {len(ocr_text)}")
        print("\n--- Sample of extracted text ---")
        print(extracted_text[:2000] + "..." if len(extracted_text) > 2000 else extracted_text)
        
        return extracted_text, pdf_names
        
    except Exception as e:
        print(f"Error in PDF processing: {e}")
        return None, None

# Run the test
extracted_text, pdf_names = await test_pdf_processing()

Testing PDF processing...
Processed PDFs: ['Trademarks.pdf']
Extracted text length: 24268
OCR text length: 0

--- Sample of extracted text ---
File: Trademarks.pdf_page10_img1.jpeg


File: Trademarks.pdf_page10_img2.jpeg
WHAT IS A COINED WORD?

Coined words are invented words, the words which do not have any
meaning and have advantage of protection easily.

As in the name of “Venron (Pvt) Limited”, the coined word is
“VYenron” which is an invented word and do not have any meaning.

WHAT ARE SERVICE MARKS?

Service mark is a name given to trademarks registered to
distinguish the services of an enterprise from those of others.

Il. According to Trademarks Ordinance 2001, service means, service
of any description which is made available to users or potential

user and includes the provision for services in connection with
business of any industrial or commercial nature.

Ill. Examples include:

Telecommunication, education, law, financing, insurance, real
estate, transport, processing, su

In [10]:
## Cell 8: Test Text Chunking

def test_text_chunking():
    """Test text chunking functionality"""
    print("Testing text chunking...")
    
    if extracted_text:
        # Test simple chunking
        chunks = chunk_text(extracted_text)
        print(f"Number of chunks (simple): {len(chunks)}")
        
        # Test chunking with metadata
        source_texts = {
            "sample_test.pdf_page1": extracted_text,
            "sample_test.pdf_page2": extracted_text  # Duplicate for testing
        }
        
#        chunks_with_meta, metadatas = chunk_text_with_metadata(extracted_text)
#        print(f"Number of chunks (with metadata): {len       (chunks_with_meta)}")
        # print(f"Number of metadata entries: {len(metadatas)}")
        
        # if metadatas:
        #     print("\n--- Sample metadata ---")
        for i,text in enumerate(chunks[:30]):  # Show first 3
            print(f"Chunk {i}: {text}")
        


# Run the test
chunks=test_text_chunking()
#chunks_with_meta, metadatas = test_text_chunking()

Testing text chunking...
['File: Trademarks.pdf_page10_img1.jpeg\n\n\nFile: Trademarks.pdf_page10_img2.jpeg\nWHAT IS A COINED WORD?\n\nCoined words are invented words, the words which do not have any\nmeaning and have advantage of protection easily.\n\nAs in the name of “Venron (Pvt) Limited”, the coined word is\n“VYenron” which is an invented word and do not have any meaning.\n\nWHAT ARE SERVICE MARKS?\n\nService mark is a name given to trademarks registered to\ndistinguish the services of an enterprise from those of others.', 'Il. According to Trademarks Ordinance 2001, service means, service\nof any description which is made available to users or potential\n\nuser and includes the provision for services in connection with\nbusiness of any industrial or commercial nature.\n\nIll. Examples include:\n\nTelecommunication, education, law, financing, insurance, real\nestate, transport, processing, supply of goods, lodging,\nentertainment, amusement, construction, repair, conveying of news

In [None]:
## Cell 9: Test Vector Store Creation (Requires API Key)

def test_vector_store_creation():
    """Test vector store creation and operations"""
    print("Testing vector store creation...")
    
    if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your-google-api-key-here":
        print("⚠️  Google API key not configured. Skipping vector store test.")
        print("Please set GOOGLE_API_KEY to test vector store functionality.")
        return None
    
    if chunks_with_meta and metadatas:
        try:
            # Create vector store
            vector_store = create_vector_store(chunks_with_meta, SESSION_ID, metadatas)
            print("✅ Vector store created successfully!")
            
            # Test similarity search
            query = "sample PDF document"
            results = vector_store.similarity_search(query, k=2)
            
            print(f"\n--- Similarity search results for '{query}' ---")
            for i, doc in enumerate(results):
                print(f"Result {i+1}:")
                print(f"Content: {doc.page_content[:200]}...")
                print(f"Metadata: {doc.metadata}")
                print()
            
            return vector_store
            
        except Exception as e:
            print(f"❌ Error creating vector store: {e}")
            return None
    else:
        print("No chunks available for vector store creation")
        return None

# Run the test
vector_store = test_vector_store_creation()

In [None]:



## Cell 10: Test Vector Store Loading

def test_vector_store_loading():
    """Test loading an existing vector store"""
    print("Testing vector store loading...")
    
    if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your-google-api-key-here":
        print("⚠️  Google API key not configured. Skipping vector store loading test.")
        return None
    
    try:
        loaded_vector_store = load_vector_store(SESSION_ID)
        print("✅ Vector store loaded successfully!")
        
        # Test the loaded vector store
        query = "testing document"
        results = loaded_vector_store.similarity_search(query, k=1)
        
        print(f"\n--- Search results from loaded vector store ---")
        if results:
            doc = results[0]
            print(f"Content: {doc.page_content[:200]}...")
            print(f"Metadata: {doc.metadata}")
        
        return loaded_vector_store
        
    except Exception as e:
        print(f"❌ Error loading vector store: {e}")
        return None

# Run the test
loaded_vector_store = test_vector_store_loading()

In [None]:
## Cell 11: File System Inspection

def inspect_output_files():
    """Inspect the files created during processing"""
    print("Inspecting output files...")
    
    # Check output directory
    if os.path.exists(OUTPUT_DIR):
        files = os.listdir(OUTPUT_DIR)
        print(f"\nFiles in {OUTPUT_DIR}:")
        for file in files:
            file_path = os.path.join(OUTPUT_DIR, file)
            size = os.path.getsize(file_path)
            print(f"  - {file} ({size} bytes)")
    else:
        print(f"Output directory {OUTPUT_DIR} does not exist")
    
    # Check vector store directory
    if os.path.exists(VECTOR_STORE_DIR):
        files = os.listdir(VECTOR_STORE_DIR)
        print(f"\nFiles in {VECTOR_STORE_DIR}:")
        for file in files:
            file_path = os.path.join(VECTOR_STORE_DIR, file)
            size = os.path.getsize(file_path)
            print(f"  - {file} ({size} bytes)")
    else:
        print(f"Vector store directory {VECTOR_STORE_DIR} does not exist")

inspect_output_files()

In [None]:
## Cell 12: Performance and Error Handling Tests

async def test_error_handling():
    """Test error handling scenarios"""
    print("Testing error handling...")
    
    # Test with empty file list
    try:
        ocr_text, text, names = await extract_text_from_pdfs([], SESSION_ID)
        print("✅ Empty file list handled correctly")
    except Exception as e:
        print(f"❌ Error with empty file list: {e}")
    
    # Test with invalid PDF
    try:
        invalid_file = MockFile("invalid.pdf", b"not a pdf")
        ocr_text, text, names = await extract_text_from_pdfs([invalid_file], SESSION_ID)
        print("⚠️  Invalid PDF processed (might need better validation)")
    except Exception as e:
        print(f"✅ Invalid PDF properly rejected: {e}")
    
    # Test chunking with empty text
    try:
        chunks = chunk_text("")
        print(f"✅ Empty text chunking handled: {len(chunks)} chunks")
    except Exception as e:
        print(f"❌ Error with empty text chunking: {e}")

# Run error handling tests
await test_error_handling()

In [None]:
## Cell 13: Cleanup and Summar

def cleanup_test_files():
    """Clean up test files (optional)"""
    import shutil
    
    response = input("Do you want to clean up test files? (y/n): ").lower()
    if response == 'y':
        try:
            if os.path.exists(f"pdf_output/{SESSION_ID}"):
                shutil.rmtree(f"pdf_output/{SESSION_ID}")
                print(f"✅ Cleaned up {OUTPUT_DIR}")
            
            if os.path.exists(f"faiss_index/{SESSION_ID}"):
                shutil.rmtree(f"faiss_index/{SESSION_ID}")
                print(f"✅ Cleaned up {VECTOR_STORE_DIR}")
                
        except Exception as e:
            print(f"❌ Error during cleanup: {e}")
    else:
        print("Files preserved for inspection")

def print_summary():
    """Print test summary"""
    print("\n" + "="*60)
    print("TESTING SUMMARY")
    print("="*60)
    print("✅ PDF processing functions defined")
    print("✅ Vector store utilities defined")
    print("✅ Mock file system created")
    print("✅ PDF text extraction tested")
    print("✅ Text chunking tested")
    
    if GOOGLE_API_KEY != "your-google-api-key-here":
        print("✅ Vector store creation tested")
        print("✅ Vector store loading tested")
    else:
        print("⚠️  Vector store tests skipped (API key needed)")
    
    print("✅ Error handling tested")
    print("✅ File system inspection completed")
    print("="*60)

print_summary()
cleanup_test_files()