installing pdf librery

In [None]:
!pip install pdfplumber

Importing librery

In [None]:
import pdfplumber
import re
from pprint import pprint

Text Extracting Function

In [None]:
def extract_text_with_tables(pdf_path):
    """
    Extracts both text and tables from PDF using pdfplumber
    Returns list of pages with content and table info
    """
    print(f"📄 Extracting from: {pdf_path}")
    extracted_data = []
    
    with pdfplumber.open(pdf_path) as pdf:
        total_pages = len(pdf.pages)
        print(f"Found {total_pages} pages")
        
        for page_num, page in enumerate(pdf.pages, 1):
            # Extract text
            text = page.extract_text() or ""
            text = re.sub(r'\s+', ' ', text).strip()
            
            # Extract tables
            tables = page.extract_tables()
            table_texts = []
            
            for table_num, table in enumerate(tables):
                table_str = f"\n[TABLE {table_num + 1}]:\n"
                for row in table:
                    # Clean each cell and join with pipes
                    clean_row = [str(cell).strip() if cell else "" for cell in row]
                    table_str += " | ".join(clean_row) + "\n"
                table_texts.append(table_str)
            
            # Combine text and tables
            full_content = text + "\n" + "\n".join(table_texts)
            
            extracted_data.append({
                'page_number': page_num,
                'text': full_content,
                'table_count': len(tables),
                'has_tables': len(tables) > 0
            })
            
            print(f"✓ Page {page_num}: {len(tables)} tables extracted")
    
    print(f"✅ Finished! Extracted {len(extracted_data)} pages")
    return extracted_data

Text Extracting

In [None]:
# Extract text and tables
pdf_path = "data.pdf"
document_data = extract_text_with_tables(pdf_path)

In [None]:
# Check what we got
print(f"\n📊 Extraction Summary:")
print(f"Total pages: {len(document_data)}")
print(f"Pages with tables: {sum(1 for page in document_data if page['has_tables'])}")

# Show sample of first page
if document_data:
    first_page = document_data[0]
    print(f"\n📝 Page {first_page['page_number']} preview:")
    print("=" * 50)
    print(first_page['text'][:300] + "..." if len(first_page['text']) > 300 else first_page['text'])

In [None]:
# Find pages that have tables
pages_with_tables = [page for page in document_data if page['has_tables']]

if pages_with_tables:
    print(f"\n🔍 Found {len(pages_with_tables)} pages with tables:")
    for page in pages_with_tables[:2]:  # Show first 2 pages with tables
        print(f"\nPage {page['page_number']} ({page['table_count']} tables):")
        print("-" * 30)
        # Find the table part in the text
        table_part = page['text'].split('[TABLE')[1].split(']')[0] if '[TABLE' in page['text'] else "No table marker found"
        print(f"Contains: {table_part}...")
else:
    print("No tables found in document")

Important Keywords (like budget, debt, or infrastructure details)

In [None]:
# ====== IDENTIFY IMPORTANT FINANCIAL INFORMATION ======
# (Add this after Task 1 extraction is done, before Task 2)

def identify_important_info(document_data):
    """
    Tags important financial information in the extracted data
    """
    print("🏷️ Tagging important financial information...")
    
    financial_keywords = {
        'budget': ['budget', 'surplus', 'deficit', 'operating result', 'revenue', 'expense'],
        'debt': ['debt', 'borrowing', 'liability', 'interest', 'loan', 'borrowings'],
        'infrastructure': ['infrastructure', 'capital works', 'assets', 'property', 'plant', 'equipment'],
        'taxation': ['tax', 'taxation', 'gsp', 'revenue', 'tax burden'],
        'superannuation': ['superannuation', 'pension', 'retirement', 'liabilities'],
        'financial_policy': ['financial policy', 'objective', 'strategy', 'principle'],
        'service_delivery': ['service delivery', 'health', 'education', 'community']
    }
    
    tagged_data = []
    
    for page in document_data:
        text_lower = page['text'].lower()
        page_tags = []
        
        # Check which financial topics are on this page
        for topic, keywords in financial_keywords.items():
            if any(keyword in text_lower for keyword in keywords):
                page_tags.append(topic)
        
        # Add tags to page data
        tagged_page = page.copy()
        tagged_page['financial_topics'] = page_tags
        tagged_data.append(tagged_page)
        
        if page_tags:
            print(f"✓ Page {page['page_number']}: {', '.join(page_tags)}")
    
    return tagged_data

# Run the tagging
print("🔍 Identifying important financial information...")
document_data = identify_important_info(document_data)

# Show summary
print(f"\n📊 Important topics found:")
all_topics = set()
for page in document_data:
    all_topics.update(page['financial_topics'])

print(f"Financial concepts identified: {', '.join(sorted(all_topics))}")

# Verify the new structure
print(f"\n🧮 Sample page structure now includes 'financial_topics':")
if document_data:
    print(f"Page 1 keys: {list(document_data[0].keys())}")
    print(f"Page 1 topics: {document_data[0]['financial_topics']}")

Task 2 
Process Text

In [None]:
# Create the Chunking Function
def chunk_document_text(extracted_data, sentences_per_chunk=4):
    """
    Processes extracted PDF text into smaller chunks for better search.
    Each chunk contains sentences from the same page with financial tags.
    """
    print("✂️ Chunking document text into search-optimized pieces...")
    chunks_with_metadata = []
    
    for page_data in extracted_data:
        page_num = page_data['page_number']
        text = page_data['text']
        financial_topics = page_data['financial_topics']
        
        # Split text into sentences (better approach for financial documents)
        sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
        
        # Create chunks of 3-5 sentences (optimal for financial Q&A)
        for i in range(0, len(sentences), sentences_per_chunk):
            chunk_sentences = sentences[i:i + sentences_per_chunk]
            chunk_text = ' '.join(chunk_sentences).strip()
            
            if chunk_text and len(chunk_text) > 50:  # Skip very short chunks
                chunks_with_metadata.append({
                    'text': chunk_text,
                    'page_number': page_num,
                    'financial_topics': financial_topics,
                    'chunk_id': f"page{page_num}_chunk{i//sentences_per_chunk + 1}",
                    'has_tables': page_data['has_tables'],
                    'table_count': page_data['table_count']
                })
    
    print(f"✅ Created {len(chunks_with_metadata)} chunks from {len(extracted_data)} pages")
    return chunks_with_metadata

In [None]:
# Process your extracted text into optimized chunks
document_chunks = chunk_document_text(document_data)

print(f"\n📊 Chunking completed!")
print(f"Total chunks created: {len(document_chunks)}")

In [None]:
# Let's examine what we created
print("🔍 Sample of created chunks (with financial tags):")
print("=" * 70)

# Show chunks from different financial topics
sample_chunks = []
for chunk in document_chunks:
    if chunk['financial_topics']:  # Only show chunks with financial tags
        sample_chunks.append(chunk)
    if len(sample_chunks) >= 3:  # Show 3 samples
        break

for i, chunk in enumerate(sample_chunks):
    print(f"\nChunk {i+1}:")
    print(f"Page {chunk['page_number']} | Topics: {chunk['financial_topics']}")
    print(f"Content: {chunk['text'][:100]}...")
    print("-" * 50)

In [None]:
# Analyze chunk distribution by financial topic
print("📈 Chunk distribution by financial topic:")
print("=" * 40)

topic_distribution = {}
for chunk in document_chunks:
    for topic in chunk['financial_topics']:
        topic_distribution[topic] = topic_distribution.get(topic, 0) + 1

# Sort by count (most common first)
for topic, count in sorted(topic_distribution.items(), key=lambda x: x[1], reverse=True):
    print(f"{topic}: {count} chunks")

print(f"\nChunks with tables: {sum(1 for chunk in document_chunks if chunk['has_tables'])}")

In [None]:
# Quick quality check
print("\n🧪 Quality Check:")
print("=" * 30)

# Check average chunk length
avg_length = sum(len(chunk['text']) for chunk in document_chunks) / len(document_chunks)
print(f"Average chunk length: {avg_length:.0f} characters")

# Check chunks are properly tagged
untagged_chunks = sum(1 for chunk in document_chunks if not chunk['financial_topics'])
print(f"Untagged chunks: {untagged_chunks}")

# Show a table-containing chunk example
table_chunks = [chunk for chunk in document_chunks if chunk['has_tables']]
if table_chunks:
    print(f"\nExample table chunk (Page {table_chunks[0]['page_number']}):")
    print(table_chunks[0]['text'][:150] + "...")

Saving the processed data

In [None]:
# ====== SAVE PROCESSED DATA ======
import pickle

def save_processed_data(chunks, filename="financial_chunks.pkl"):
    """Save processed chunks to disk"""
    with open(filename, 'wb') as f:
        pickle.dump(chunks, f)
    print(f"💾 Saved {len(chunks)} chunks to {filename}")

# Save your hard work!
save_processed_data(document_chunks)

# Optional: Quick verification
print(f"📦 Sample chunk structure:")
print(f"Keys: {list(document_chunks[0].keys())}")
print(f"First topic: {document_chunks[0]['financial_topics']}")

Making Vector Data

In [None]:
!pip install chromadb sentence-transformers

In [None]:
import chromadb
from sentence_transformers import SentenceTransformer
import numpy as np

In [None]:
# Initialize the embedding model (creates numerical representations of text)
print("🔄 Loading embedding model...")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
print("✅ Embedding model loaded!")

In [None]:
!pip install chromadb --upgrade
!pip install sentence-transformers

In [None]:


# Initialize embedding model
print("🔄 Loading embedding model...")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
print("✅ Embedding model loaded!")

# Initialize ChromaDB with simpler configuration
print("🗄️ Initializing vector database...")
client = chromadb.Client()  # Use default in-memory client for simplicity

# Create collection
collection = client.create_collection(
    name="financial_policy",
    metadata={"hnsw:space": "cosine"}
)

print("✅ Vector database initialized!")

In [None]:
def prepare_for_vector_db(chunks):
    """Prepare chunks for insertion into vector database"""
    print("📦 Preparing data for vector database...")
    
    texts = []
    metadatas = []
    ids = []
    
    for i, chunk in enumerate(chunks):
        texts.append(chunk['text'])
        metadatas.append({
            'page_number': chunk['page_number'],
            'financial_topics': str(chunk['financial_topics']),  # Convert to string for ChromaDB
            'chunk_id': chunk['chunk_id'],
            'has_tables': chunk['has_tables']
        })
        ids.append(f"chunk_{i}")
    
    return texts, metadatas, ids

# Prepare your chunks
texts, metadatas, ids = prepare_for_vector_db(document_chunks)
print(f"✅ Prepared {len(texts)} chunks for database insertion")

In [None]:
print("🧠 Creating embeddings... (this may take a moment)")
# Process in batches to avoid memory issues
batch_size = 50
all_embeddings = []

for i in range(0, len(texts), batch_size):
    batch_texts = texts[i:i + batch_size]
    batch_embeddings = embedding_model.encode(batch_texts).tolist()
    all_embeddings.extend(batch_embeddings)
    print(f"Processed batch {i//batch_size + 1}/{(len(texts)//batch_size)+1}")

print("💾 Adding chunks to vector database...")
collection.add(
    embeddings=all_embeddings,
    documents=texts,
    metadatas=metadatas,
    ids=ids
)

print("✅ Vector database populated!")
print(f"📊 Total chunks stored: {collection.count()}")

In [None]:
def test_semantic_search(query, n_results=3):
    """Test semantic search with a sample query"""
    print(f"\n🔍 Testing search: '{query}'")
    
    # Create embedding for the query
    query_embedding = embedding_model.encode([query]).tolist()
    
    try:
        # Search the database
        results = collection.query(
            query_embeddings=query_embedding,
            n_results=n_results,
            include=['metadatas', 'documents', 'distances']
        )
        
        print(f"Found {len(results['documents'][0])} results:")
        for i, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])):
            print(f"\n{i+1}. Page {metadata['page_number']} (Similarity: {1 - results['distances'][0][i]:.3f})")
            print(f"   Topics: {metadata['financial_topics']}")
            print(f"   Content: {doc[:100]}...")
            
    except Exception as e:
        print(f"❌ Search error: {e}")
        return None

# Test searches
test_queries = [
    "budget deficit 2005",
    "debt levels",
    "infrastructure investment"
]

for query in test_queries:
    test_semantic_search(query)
    print("-" * 80)

Searching Test

In [None]:
# Quick verification
print("🧪 Final verification:")
print(f"Collection name: {collection.name}")
print(f"Total chunks: {collection.count()}")

qa = input("Enter your query: ")

# Try a simple search
results = collection.query(
    query_texts=[qa],
    n_results=2
)
print(f"Sample search works: {len(results['documents'][0])} results found")

In [None]:
# @title Query Input Cell (Run this cell to get input)
from IPython.display import display
import ipywidgets as widgets

# Create input widget
query_input = widgets.Text(
    value='',
    placeholder='Enter your financial query...',
    description='Query:',
    layout=widgets.Layout(width='80%')
)

search_button = widgets.Button(description="Search", button_style='success')

# Display widgets
display(query_input)
display(search_button)

# Store results
search_results = []

def on_search_clicked(b):
    query = query_input.value.strip()
    if query:
        print(f"🔍 Searching for: '{query}'")
        results = collection.query(
            query_texts=[query],
            n_results=3,
            include=['metadatas', 'documents', 'distances']
        )
        search_results.append(results)
        display_search_results(results)
    else:
        print("❌ Please enter a query")

search_button.on_click(on_search_clicked)

def display_search_results(results):
    """Display search results nicely"""
    from IPython.display import HTML, display
    
    if results and results['documents']:
        html_output = f"<h3>📄 Found {len(results['documents'][0])} results:</h3>"
        
        for i, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])):
            similarity = 1 - results['distances'][0][i] if results['distances'] else 0
            html_output += f"""
            <div style='border: 1px solid #ccc; padding: 10px; margin: 10px; border-radius: 5px;'>
                <b>{i+1}. 📍 Page {metadata['page_number']}</b> (Similarity: {similarity:.3f})<br>
                <small>🏷️ Topics: {metadata['financial_topics']}</small><br>
                <p>{doc[:150]}...</p>
            </div>
            """
        
        display(HTML(html_output))
    else:
        print("❌ No results found")