In [None]:
# Install all required packages
!pip install pypdf2 pdf2image pdfplumber pytesseract opencv-python-headless pillow sentence-transformers faiss-cpu nltk

# Import all necessary libraries
import os
import io
import fitz
import pdfplumber
import numpy as np
from PIL import Image
import cv2
from google.colab import files
import faiss
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any
import json

class DocumentChunker:
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def create_chunks(self, text: str) -> List[str]:
        """
        Split text into overlapping chunks using a simple character-based approach
        """
        if not text:
            return []

        # Split text into words
        words = text.split()

        if not words:
            return []

        chunks = []
        current_chunk = []
        current_length = 0

        for word in words:
            current_length += len(word) + 1  # +1 for space
            current_chunk.append(word)

            if current_length >= self.chunk_size:
                # Add chunk to list
                chunks.append(' '.join(current_chunk))

                # Keep overlap words for next chunk
                overlap_words = current_chunk[-self.chunk_overlap:]
                current_chunk = overlap_words
                current_length = sum(len(word) + 1 for word in overlap_words)

        # Add the last chunk if it exists and is not too small
        if current_chunk and current_length > self.chunk_overlap:
            chunks.append(' '.join(current_chunk))

        return chunks

class VectorStore:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        self.encoder = SentenceTransformer(model_name)
        self.index = None
        self.chunks = []
        self.metadata = []

    def add_documents(self, chunks: List[str], metadata: List[Dict[str, Any]] = None):
        if not chunks:
            return

        embeddings = self.encoder.encode(chunks)

        if self.index is None:
            dimension = embeddings.shape[1]
            self.index = faiss.IndexFlatL2(dimension)

        self.index.add(np.array(embeddings).astype('float32'))

        start_idx = len(self.chunks)
        self.chunks.extend(chunks)

        if metadata is None:
            metadata = [{"index": i} for i in range(start_idx, start_idx + len(chunks))]
        self.metadata.extend(metadata)

    def search(self, query: str, k: int = 5) -> List[Dict[str, Any]]:
        query_embedding = self.encoder.encode([query])
        distances, indices = self.index.search(np.array(query_embedding).astype('float32'), k)

        results = []
        for idx, distance in zip(indices[0], distances[0]):
            if idx < len(self.chunks):
                results.append({
                    'chunk': self.chunks[idx],
                    'metadata': self.metadata[idx],
                    'distance': float(distance)
                })

        return results

    def save(self, path: str):
        if self.index is not None:
            faiss.write_index(self.index, f"{path}_index.faiss")
            with open(f"{path}_data.json", 'w', encoding='utf-8') as f:
                json.dump({
                    'chunks': self.chunks,
                    'metadata': self.metadata
                }, f, ensure_ascii=False)

    @classmethod
    def load(cls, path: str):
        instance = cls()
        instance.index = faiss.read_index(f"{path}_index.faiss")
        with open(f"{path}_data.json", 'r', encoding='utf-8') as f:
            data = json.load(f)
            instance.chunks = data['chunks']
            instance.metadata = data['metadata']
        return instance

def extract_all_from_pdf(pdf_path):
    """
    Extract text, tables, and images from a PDF file
    """
    extracted_data = {
        'text': [],
        'tables': [],
        'images': []
    }

    try:
        with pdfplumber.open(pdf_path) as pdf:
            for page_num, page in enumerate(pdf.pages):
                # Extract text
                text = page.extract_text()
                if text:
                    extracted_data['text'].append({
                        'page': page_num + 1,
                        'content': text
                    })

                # Extract tables
                tables = page.extract_tables()
                if tables:
                    extracted_data['tables'].extend([{
                        'page': page_num + 1,
                        'content': table
                    } for table in tables])

        # Image Extraction
        doc = fitz.open(pdf_path)
        for page_num, page in enumerate(doc):
            image_list = page.get_images()

            for img_index, img in enumerate(image_list):
                xref = img[0]
                base_image = doc.extract_image(xref)
                image_bytes = base_image["image"]

                image = Image.open(io.BytesIO(image_bytes))

                extracted_data['images'].append({
                    'page': page_num + 1,
                    'index': img_index,
                    'image': image
                })

        doc.close()

    except Exception as e:
        print(f"Error during extraction: {str(e)}")

    return extracted_data

def save_extracted_data(extracted_data, output_dir):
    """
    Save extracted content to files
    """
    try:
        os.makedirs(output_dir, exist_ok=True)

        # Save text
        for i, text_item in enumerate(extracted_data['text']):
            with open(f"{output_dir}/page_{text_item['page']}_text.txt", 'w', encoding='utf-8') as f:
                f.write(text_item['content'])

        # Save tables as CSV
        import csv
        for i, table_item in enumerate(extracted_data['tables']):
            with open(f"{output_dir}/page_{table_item['page']}_table_{i}.csv", 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerows(table_item['content'])

        # Save images
        for i, img_item in enumerate(extracted_data['images']):
            img_path = f"{output_dir}/page_{img_item['page']}_image_{img_item['index']}.png"
            img_item['image'].save(img_path)

    except Exception as e:
        print(f"Error saving data: {str(e)}")

def process_pdf_complete(pdf_path: str, output_dir: str, chunk_size: int = 500):
    """
    Complete pipeline: Extract content, create chunks, and build vector store
    """
    print(f"Processing PDF: {pdf_path}")

    # Step 1: Extract all content
    print("Extracting content...")
    extracted_data = extract_all_from_pdf(pdf_path)

    # Step 2: Initialize chunker and vector store
    print("Initializing chunking and vector storage...")
    chunker = DocumentChunker(chunk_size=chunk_size)
    vector_store = VectorStore()

    # Step 3: Process and chunk content
    print("Processing text and creating chunks...")
    all_chunks = []
    all_metadata = []

    # Process text
    for text_item in extracted_data['text']:
        chunks = chunker.create_chunks(text_item['content'])
        all_chunks.extend(chunks)
        chunk_metadata = [{
            'type': 'text',
            'page': text_item['page'],
            'chunk_index': i
        } for i in range(len(chunks))]
        all_metadata.extend(chunk_metadata)

    # Process tables
    for table_item in extracted_data['tables']:
        table_text = '\n'.join([' '.join(map(str, row)) for row in table_item['content']])
        chunks = chunker.create_chunks(table_text)
        all_chunks.extend(chunks)
        chunk_metadata = [{
            'type': 'table',
            'page': table_item['page'],
            'chunk_index': i
        } for i in range(len(chunks))]
        all_metadata.extend(chunk_metadata)

    # Step 4: Add to vector store
    print("Creating embeddings and building vector index...")
    if all_chunks:
        vector_store.add_documents(all_chunks, all_metadata)

    # Step 5: Save everything
    print("Saving extracted content and vector store...")
    save_extracted_data(extracted_data, output_dir)
    vector_store.save(f"{output_dir}/vector_store")

    print(f"\nExtraction Summary:")
    print(f"Pages with text: {len(extracted_data['text'])}")
    print(f"Tables found: {len(extracted_data['tables'])}")
    print(f"Images found: {len(extracted_data['images'])}")
    print(f"Total chunks created: {len(all_chunks)}")
    print(f"\nAll content saved to: {output_dir}")

    return vector_store

def main():
    print("Upload a PDF file:")
    uploaded = files.upload()

    for filename in uploaded.keys():
        output_dir = f'extracted_{os.path.splitext(filename)[0]}'
        try:
            vector_store = process_pdf_complete(filename, output_dir)

            # Test search
            print("\nTesting vector search...")
            query = "What is the main topic?"
            results = vector_store.search(query, k=3)

            print(f"\nTop 3 results for query: '{query}'")
            for i, result in enumerate(results):
                print(f"\nResult {i+1}:")
                print(f"Content: {result['chunk'][:200]}...")
                print(f"Page: {result['metadata']['page']}")
                print(f"Type: {result['metadata']['type']}")
                print(f"Distance: {result['distance']:.4f}")

        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")

if __name__ == "__main__":
    main()

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import json
import pandas as pd
import os
import glob

@dataclass
class SearchResult:
    chunk: str
    metadata: Dict[str, Any]
    similarity_score: float
    table_data: Optional[List[List[str]]] = None

def read_csv_table(output_dir: str, page_number: int, table_index: int = 0) -> Optional[List[List[str]]]:
    """
    Read a specific table from the saved CSV files
    """
    try:
        csv_path = os.path.join(output_dir, f"page_{page_number}_table_{table_index}.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            # Convert DataFrame to list of lists, including column headers
            table_data = [df.columns.tolist()] + df.values.tolist()
            return table_data
        return None
    except Exception as e:
        print(f"Error reading CSV file: {e}")
        return None

def get_tables_for_page(output_dir: str, page_number: int) -> List[List[List[str]]]:
    """
    Get all tables from a specific page
    """
    tables = []
    pattern = os.path.join(output_dir, f"page_{page_number}_table_*.csv")
    for csv_file in sorted(glob.glob(pattern)):
        try:
            df = pd.read_csv(csv_file)
            table_data = [df.columns.tolist()] + df.values.tolist()
            tables.append(table_data)
        except Exception as e:
            print(f"Error reading {csv_file}: {e}")
    return tables

def format_table(table_data: List[List[Any]]) -> str:
    """Format table data into a readable string"""
    if not table_data:
        return ""

    # Convert all values to strings and handle None/empty values
    table_data = [[str(cell) if cell is not None else '' for cell in row] for row in table_data]

    # Calculate column widths
    col_widths = [max(len(str(cell)) for cell in col) for col in zip(*table_data)]

    # Create formatted rows
    formatted_rows = []
    for row in table_data:
        formatted_row = " | ".join(f"{str(cell):<{width}}" for cell, width in zip(row, col_widths))
        formatted_rows.append(f"| {formatted_row} |")

    # Create separator
    separator = "+{}+".format("+".join("-" * (width + 2) for width in col_widths))

    # Combine everything
    return "\n".join([
        separator,
        formatted_rows[0],
        separator,
        *formatted_rows[1:],
        separator
    ])

class QueryHandler:
    # def __init__(self, vector_store, output_dir: str):
    #     """
    #     Initialize QueryHandler with a vector store and output directory.

    #     Args:
    #         vector_store: Vector store containing document embeddings
    #         output_dir: Directory containing extracted document content
    #     """
    #     self.vector_store = vector_store
    #     self.output_dir = output_dir
    #     self.image_cache = {}  # Cache for analyzed images

    #     # Initialize the cosine similarity index if vector store has an index
    #     if hasattr(vector_store, 'index') and vector_store.index is not None:
    #         dimension = vector_store.index.d
    #         self._replace_index_with_cosine(dimension)

    # def _replace_index_with_cosine(self, dimension: int):
    #     """
    #     Replace the existing index with a cosine similarity index.
    #     """
    #     new_index = faiss.IndexFlatIP(dimension)

    #     if self.vector_store.index.ntotal > 0:
    #         vectors = faiss.vector_to_array(self.vector_store.index.reconstruct_n(0, self.vector_store.index.ntotal))
    #         vectors = vectors.reshape(-1, dimension)
    #         faiss.normalize_L2(vectors)
    #         new_index.add(vectors)

    #     self.vector_store.index = new_index

    # def get_table_data(self, page_number: int) -> List[Dict[str, Any]]:
    #     """
    #     Get table data for a specific page.
    #     """
    #     # Implement table extraction logic here
    #     # This should return a list of dictionaries containing table data
    #     return []

    # def get_image_data(self, page_number: Optional[int] = None) -> List[ImageData]:
    #     """
    #     Get and analyze images for a specific page.
    #     """
    #     image_data = []
    #     if page_number is not None:
    #         image_paths = get_images_for_page(self.output_dir, page_number)
    #         for img_path in image_paths:
    #             if img_path not in self.image_cache:
    #                 self.image_cache[img_path] = analyze_image(img_path)
    #             if self.image_cache[img_path]:
    #                 image_data.append(self.image_cache[img_path])
    #     return image_data

    # def search(self, query: str, page_number: Optional[int] = None, k: int = 5) -> List[SearchResult]:
    #     """
    #     Search for relevant content based on the query.
    #     """
    #     results = []
    #     query_lower = query.lower()

    #     # Handle image-specific queries
    #     if any(term in query_lower for term in ['image', 'chart', 'graph', 'unemployment', 'figure']):
    #         if page_number is not None:
    #             image_data = self.get_image_data(page_number)
    #             for img_data in image_data:
    #                 results.append(SearchResult(
    #                     chunk=img_data.content,
    #                     metadata={
    #                         'type': 'image',
    #                         'page': page_number,
    #                         'path': img_data.image_path
    #                     },
    #                     similarity_score=1.0,
    #                     image_data=img_data
    #                 ))

    #     # Handle table queries
    #     if 'table' in query_lower or 'tabular' in query_lower:
    #         table_data = self.get_table_data(page_number) if page_number is not None else []
    #         for table_info in table_data:
    #             results.append(SearchResult(
    #                 chunk=f"Table from page {table_info['page']}",
    #                 metadata={
    #                     'type': 'table',
    #                     'page': table_info['page'],
    #                     'table_index': table_info['table_index']
    #                 },
    #                 similarity_score=1.0,
    #                 table_data=table_info['table_data']
    #             ))

    #     # Perform vector search if no specific results found
    #     if not results and hasattr(self.vector_store, 'search'):
    #         vector_results = self.vector_store.search(query, k=k)
    #         if vector_results:
    #             for result in vector_results:
    #                 if page_number is None or result.metadata['page'] == page_number:
    #                     results.append(result)

    #     return results[:k]
    # ... rest of the QueryHandler class methods remain the same ...
    def __init__(self, vector_store: VectorStore, output_dir: str):
        self.vector_store = vector_store
        self.output_dir = output_dir
        if vector_store.index is not None:
            dimension = vector_store.index.d
            self._replace_index_with_cosine(dimension)

    def _replace_index_with_cosine(self, dimension: int):
        self.vector_store.index = faiss.IndexFlatIP(dimension)
        if self.vector_store.chunks:
            embeddings = self.vector_store.encoder.encode(self.vector_store.chunks)
            faiss.normalize_L2(embeddings)
            self.vector_store.index.add(embeddings)

    def get_table_data(self, page_number: Optional[int] = None) -> List[Dict[str, Any]]:
        """
        Get table data for a specific page or all pages
        """
        if page_number is not None:
            tables = get_tables_for_page(self.output_dir, page_number)
            return [{
                'page': page_number,
                'table_index': idx,
                'table_data': table
            } for idx, table in enumerate(tables)]
        return []

    def search(self, query: str, page_number: Optional[int] = None, k: int = 5) -> List[SearchResult]:
        results = []

        # If query is about tables, prioritize table retrieval
        query_lower = query.lower()
        if 'table' in query_lower or 'tabular' in query_lower:
            if page_number is not None:
                table_data = self.get_table_data(page_number)
                for table_info in table_data:
                    results.append(SearchResult(
                        chunk=f"Table from page {table_info['page']}",
                        metadata={
                            'type': 'table',
                            'page': table_info['page'],
                            'table_index': table_info['table_index']
                        },
                        similarity_score=1.0,  # Direct match for tables
                        table_data=table_info['table_data']
                    ))

        # If no tables found or query is not about tables, perform regular search
        if not results:
            query_embedding = self.vector_store.encoder.encode([query])
            faiss.normalize_L2(query_embedding)

            similarities, indices = self.vector_store.index.search(
                query_embedding,
                min(k, len(self.vector_store.chunks))
            )

            for idx, similarity in zip(indices[0], similarities[0]):
                if idx < len(self.vector_store.chunks):
                    metadata = self.vector_store.metadata[idx]
                    if page_number is None or metadata.get('page') == page_number:
                        result = SearchResult(
                            chunk=self.vector_store.chunks[idx],
                            metadata=metadata,
                            similarity_score=float(similarity)
                        )

                        # Try to load table data if it's a table type
                        if metadata.get('type') == 'table':
                            table_data = read_csv_table(
                                self.output_dir,
                                metadata['page'],
                                metadata.get('table_index', 0)
                            )
                            if table_data:
                                result.table_data = table_data

                        results.append(result)

        return results[:k]

def process_query(vector_store: VectorStore, query: str, output_dir: str, k: int = 5) -> Dict[str, Any]:
    """
    Process a user query and return relevant results with metadata
    """
    query_handler = QueryHandler(vector_store, output_dir)

    # Extract page number if mentioned in query
    page_number = None
    query_lower = query.lower()
    if "page" in query_lower:
        try:
            words = query_lower.split()
            page_idx = words.index("page")
            if page_idx + 1 < len(words):
                page_number = int(words[page_idx + 1])
        except (ValueError, IndexError):
            pass

    # Perform search
    results = query_handler.search(query, page_number=page_number, k=k)

    # Generate response
    if not results:
        prompt = f"Question: {query}\n\nI could not find any relevant information in the document to answer this question."
    else:
        context_parts = []
        for result in results:
            if result.table_data:
                context_parts.append(f"\nTable from page {result.metadata['page']}:\n{format_table(result.table_data)}")
            else:
                context_parts.append(f"[Page {result.metadata['page']}] {result.chunk}")

        prompt = f"""Please answer the following question based on this context from the document:

Context:
{' '.join(context_parts)}

Question: {query}

Answer:"""

    return {
        'results': results,
        'prompt': prompt,
        'page_number': page_number
    }

def main():
    print("Upload a PDF file:")
    uploaded = files.upload()

    for filename in uploaded.keys():
        output_dir = f'extracted_{os.path.splitext(filename)[0]}'
        try:
            vector_store = process_pdf_complete(filename, output_dir)

            print(f"\nDocument processed successfully!")
            print(f"Total chunks: {len(vector_store.chunks)}")
            print("\nYou can now ask questions about the document.")
            print("To ask about specific pages, include 'page X' in your question.")
            print("Type 'quit' to exit.")

            while True:
                print("\n" + "-"*80)
                query = input("Enter your question: ").strip()
                if query.lower() == 'quit':
                    break

                if not query:
                    print("Please enter a question.")
                    continue

                results = process_query(vector_store, query, output_dir)
                display_results(results)

        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")
            raise e

if __name__ == "__main__":
    main()

Question: table from page 6?