In [None]:
!apt-get install poppler-utils -q
!apt-get install tesseract-ocr-all
!pip install tiktoken -q
!pip install unstructured -q
!pip install unstructured['pdf'] -q
!pip install python-docx

In [None]:
import zipfile
import os

files_path = '/content/Dr.X Files.zip'
extract_path = '/content/files'

# Create the extraction directory if it doesn't exist
os.makedirs(extract_path, exist_ok=True)

# Extract all files from the zip archive
with zipfile.ZipFile(files_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print(f"Files extracted to: {extract_path}")

Files extracted to: /content/files


In [None]:
import os
import re
import json
import tiktoken
import hashlib
from typing import List, Dict, Any, Optional, Set
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.docx import partition_docx
from unstructured.partition.xlsx import partition_xlsx
from unstructured.partition.csv import partition_csv
from unstructured.staging.base import elements_to_json
from unstructured.cleaners.core import clean_extra_whitespace, replace_unicode_quotes

### Document Chunking Strategy:

- **Semantic Chunking**: Creates chunks based on natural document boundaries like `headings` and `sections` rather than arbitrary splits
- **Size-Based Chunking**: Ensures chunks stay within a specified token limit `default 512 tokens`
- **Overlap Mechanism**: Maintains context between chunks by including overlap text `default 50 tokens`
- **Element Type Awareness**: Preserves structure by handling different content types `text, tables, images` appropriately
- **Boundary Detection**: Automatically identifies `semantic boundaries` at headings, titles, and section markers
- **Table Handling**: Special handling for tables, keeping them intact when possible or creating dedicated chunks
- **Metadata Preservation**: Each chunk retains source information, page numbers, and element types
- **Empty Element Filtering**: Skips empty elements of certain types that don't contribute to content.

In [None]:
class EnhancedDocumentChunker:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50,
                 tokenizer_name: str = "cl100k_base",
                 skip_if_empty: List[str] = None,
                 ):

        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.tokenizer_name = tokenizer_name
        self.tokenizer = tiktoken.get_encoding(tokenizer_name)
        self.skip_if_empty = skip_if_empty or ["image", "figure"]

    def process_document(self, file_path: str, include_page_breaks: bool = True,
                         strategy: str = "hi_res", infer_table_structure: bool = True,
                         hi_res_model_name: str = None) -> List[Dict]:

        """Parses and processes a document (PDF, DOCX, XLSX, or CSV) into a list of structured elements."""

        print(f"Processing: {file_path}")
        file_extension = os.path.splitext(file_path)[1].lower()

        try:
            if file_extension == '.pdf':
                elements = partition_pdf(
                    filename=file_path,
                    include_page_breaks=include_page_breaks,
                    include_metadata=True,
                    strategy=strategy,
                    infer_table_structure=infer_table_structure,
                    hi_res_model_name=hi_res_model_name,
                )
            elif file_extension in ['.docx', '.doc']:
                elements = partition_docx(
                    filename=file_path,
                    include_page_breaks=include_page_breaks,
                    include_metadata=True,
                    strategy=strategy,
                    infer_table_structure=infer_table_structure,
                    hi_res_model_name=hi_res_model_name,
                )
            elif file_extension in ['.xlsx', 'xls']:
                elements = partition_xlsx(
                    filename=file_path,
                    include_metadata=True,
                )
            elif file_extension == '.csv':
                elements = partition_csv(
                    filename=file_path,
                    include_metadata=True
                    )
            else:
                raise ValueError(f"Unsupported file format: {file_extension}")

            # Convert to dictionaries which are easier to work with
            elements_json_str = elements_to_json(elements)
            element_dicts = json.loads(elements_json_str)

            # Clean and normalize text
            for element in element_dicts:
                if "text" in element:
                    element["text"] = self._clean_text(element["text"])

            # Filter out empty elements of certain types
            filtered_elements = []
            for element in element_dicts:
                element_type = self._get_element_type(element)
                element_text = self._get_element_text(element)

                # Skip empty elements of specific types that don't contribute to content
                if element_type in self.skip_if_empty and not element_text.strip():
                    continue

                filtered_elements.append(element)
            return filtered_elements

        except Exception as e:
            print(f"Error processing {file_path}: {str(e)}")
            return []

    def process_directory(self, directory_path: str, extensions: List[str] = None,
                          include_page_breaks: bool = True, strategy: str = "hi_res",
                          hi_res_model_name: str = None,
                          infer_table_structure: bool = True) -> Dict[str, List[Dict]]:

        """Processes all supported documents in a directory and saves extracted content as JSON files."""

        # Set default extensions if none are provided
        if extensions is None:
            extensions = ['.pdf', '.docx', '.doc']

        all_documents = {}
        # Create directory to store processed JSON files
        output_dir = os.path.join(os.getcwd(), "processed_json")
        os.makedirs(output_dir, exist_ok=True)

        # Iterate through all files in the provided directory
        for filename in os.listdir(directory_path):
            file_ext = os.path.splitext(filename)[1].lower()

            # Only process files with the allowed extensions
            if file_ext in extensions:
                try:
                    file_path = os.path.join(directory_path, filename)

                    # Process individual document
                    elements = self.process_document(
                        file_path,
                        include_page_breaks=include_page_breaks,
                        strategy=strategy,
                        hi_res_model_name=hi_res_model_name,
                        infer_table_structure=infer_table_structure
                    )

                    if elements:
                        # Store the results in the dictionary
                        all_documents[filename] = elements
                        print(f"Extracted {len(elements)} elements from {filename}")

                        # Save each processed document to a separate JSON file
                        base_name = os.path.splitext(filename)[0]
                        output_file_path = os.path.join(output_dir, f"{base_name}.json")
                        with open(output_file_path, 'w', encoding='utf-8') as f:
                            json.dump(elements, f, indent=2, ensure_ascii=False)
                        print(f"Saved JSON for {filename} to {output_file_path}")
                    else:
                        print(f"No elements extracted from {filename}")
                except Exception as e:
                    print(f"Error processing {filename}: {str(e)}")

        return all_documents

    def create_semantic_chunks(self, elements: List[Dict[Any, Any]], source: str = "") -> List[Dict[Any, Any]]:
        """Converts parsed document elements into semantically meaningful text chunks with metadata."""

        if not elements:
            return []

        chunks = []
        chunk_count = 0

        # Initialize processing state
        current_chunk_text = ""
        current_chunk_tokens = []
        current_chunk_pages = set()
        current_element_types = set()
        current_tables = []

        # Helper function to finalize and store the current chunk
        def add_chunk():
            nonlocal chunk_count, current_chunk_text, current_chunk_tokens, current_chunk_pages
            nonlocal current_element_types, current_tables

            if not current_chunk_text.strip():
                return

            chunk = {
                "source": source,
                "pages": sorted(list(current_chunk_pages)),
                "chunk_number": chunk_count,
                "text": current_chunk_text,
                "token_count": len(current_chunk_tokens),
                "element_types": list(current_element_types)
            }

            if current_tables:
                chunk["tables"] = current_tables

            chunks.append(chunk)
            chunk_count += 1

        # Iterate through elements and group them into semantically meaningful chunks
        current_page = None
        i = 0
        while i < len(elements):
            element = elements[i]
            element_text = self._get_element_text(element)
            element_type = self._get_element_type(element)
            page = self._get_element_page_number(element)

            # Skip empty elements
            if not element_text and not element.get("metadata", {}).get("text_as_html"):
                i += 1
                continue

            # Track page changes
            if page is not None and page != current_page:
                current_page = page

            # Handle table elements with HTML metadata
            if element_type == "table" and element.get("metadata", {}).get("text_as_html"):
                table_html = element["metadata"]["text_as_html"]
                element_tokens = self.tokenizer.encode(element_text)

                # If adding this table would exceed chunk size
                if current_chunk_text and len(current_chunk_tokens) + len(element_tokens) > self.chunk_size:
                    # Save current chunk before starting new one with the table
                    add_chunk()

                    # Start a new chunk with the table
                    current_chunk_text = element_text
                    current_chunk_tokens = element_tokens
                    current_chunk_pages = {page} if page is not None else set()
                    current_element_types = {element_type}
                    current_tables = [table_html]
                else:
                    # Add table to current chunk
                    if current_chunk_text:
                        current_chunk_text += "\n\n" + element_text
                    else:
                        current_chunk_text = element_text
                    current_chunk_tokens.extend(element_tokens)
                    if page is not None:
                        current_chunk_pages.add(page)
                    current_element_types.add(element_type)
                    current_tables.append(table_html)

                i += 1
                continue

            # If a semantic boundary is detected and chunk has content, save it
            if self._is_semantic_boundary(element) and current_chunk_text:
                add_chunk()

                # Reset for new chunk
                current_chunk_text = ""
                current_chunk_tokens = []
                current_chunk_pages = set()
                current_element_types = set()
                current_tables = []

            # Process normal elements
            element_tokens = self.tokenizer.encode(element_text)

            # If current chunk would exceed limit, save and start a new one (with optional overlap)
            if len(current_chunk_tokens) + len(element_tokens) > self.chunk_size and current_chunk_text:
                # Save current chunk
                add_chunk()

                # Start new chunk with overlap if needed
                if self.chunk_overlap > 0 and len(current_chunk_tokens) > self.chunk_overlap:
                    overlap_start = max(0, len(current_chunk_tokens) - self.chunk_overlap)
                    overlap_text = self.tokenizer.decode(current_chunk_tokens[overlap_start:])

                    # Reset with overlap text
                    current_chunk_text = overlap_text
                    current_chunk_tokens = current_chunk_tokens[overlap_start:]
                    # Keep current page for overlap
                    if page is not None:
                        current_chunk_pages = {page}
                    else:
                        current_chunk_pages = set()
                    # Keep element types
                    # But reset tables - they're difficult to split with overlap
                    current_tables = []
                else:
                    # Reset completely
                    current_chunk_text = ""
                    current_chunk_tokens = []
                    current_chunk_pages = set()
                    current_element_types = set()
                    current_tables = []

            # Add current element to the chunk
            if current_chunk_text:
                current_chunk_text += " " + element_text
            else:
                current_chunk_text = element_text

            current_chunk_tokens.extend(element_tokens)
            if page is not None:
                current_chunk_pages.add(page)
            current_element_types.add(element_type)

            i += 1

        # Add the final chunk if there's any content left
        if current_chunk_text:
            add_chunk()

        return chunks

    def process_and_chunk_document(self, file_path: str) -> List[Dict[Any, Any]]:
        """Process a document and create semantic chunks in one step."""

        elements = self.process_document(file_path)
        if not elements:
            return []

        source = os.path.basename(file_path)
        return self.create_semantic_chunks(elements, source)

    def process_and_chunk_documents(self, documents: Dict[str, List[Dict]]) -> List[Dict[Any, Any]]:
        """Process all documents and create semantic chunks."""

        all_chunks = []
        for filename, elements in documents.items():
            source = filename
            document_chunks = self.create_semantic_chunks(elements, source)
            all_chunks.extend(document_chunks)
            print(f"Created {len(document_chunks)} chunks for {filename}")

        return all_chunks

    def save_chunks(self, chunks: List[Dict[Any, Any]], output_dir: str = "chunked_data") -> None:
        """Save chunks to disk in JSON format."""

        if not chunks:
            print("No chunks to save.")
            return

        os.makedirs(output_dir, exist_ok=True)

        # Group chunks by source
        chunks_by_source = {}
        for chunk in chunks:
            source = chunk.get("source")
            if source not in chunks_by_source:
                chunks_by_source[source] = []
            chunks_by_source[source].append(chunk)

        # Save individual source files
        for source, source_chunks in chunks_by_source.items():
            base_name = os.path.splitext(source)[0]
            output_file = os.path.join(output_dir, f"{base_name}_chunks.json")
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(source_chunks, f, ensure_ascii=False, indent=2)
            print(f"Saved {len(source_chunks)} chunks for {source}")

        # Save all chunks to a single file
        all_chunks_file = os.path.join(output_dir, "all_chunks.json")
        with open(all_chunks_file, 'w', encoding='utf-8') as f:
            json.dump(chunks, f, ensure_ascii=False, indent=2)

        print(f"Saved {len(chunks)} total chunks to {output_dir}/all_chunks.json")

    def _clean_text(self, text: str) -> str:
        """Clean and normalize text content."""

        if not text:
            return ""

        # Clean extra whitespace
        text = clean_extra_whitespace(text)
        # Replace Unicode quotes
        text = replace_unicode_quotes(text)

        # Additional cleaning if needed
        text = text.strip()

        return text

    def _get_element_text(self, element: Dict) -> str:
        """Extract text from an element with formatting cleanup."""

        return element.get("text", "").strip()

    def _get_element_type(self, element: Dict) -> str:
        """Get the type of an element for structural chunking."""

        return element.get("type", "Unknown").lower()

    def _get_element_metadata(self, element: Dict) -> Dict:
        """Extract metadata from an element dict."""

        return element.get("metadata", {})

    def _get_element_page_number(self, element: Dict) -> Optional[int]:
        """Extract page number from element metadata"""

        metadata = self._get_element_metadata(element)
        return metadata.get("page_number")

    def _is_semantic_boundary(self, element: Dict) -> bool:
        """Check if an element is a semantic boundary."""

        element_type = self._get_element_type(element)
        text = element.get("text", "").strip()

        # Check for headings, titles, section markers
        if element_type in ["heading", "title", "section", "subsection", "header"]:
            return True

        # Check for page-break that coincides with semantic boundaries
        if "page_break" in element_type and text:
            return True

        return False

In [None]:
# Initialize the chunker
chunker = EnhancedDocumentChunker(
    chunk_size=512,
    chunk_overlap=50,
    tokenizer_name="cl100k_base",
    skip_if_empty=["Image", "Figure"],
)

# Process the documents
documents = chunker.process_directory("/content/files")

# Create chunks
chunks = chunker.process_and_chunk_documents(documents)

# Save chunks
chunker.save_chunks(chunks)
print(f"Chunking complete. Created {len(chunks)} total chunks.")

Processing: /content/files/new-approaches-and-procedures-for-cancer-treatment.pdf
Extracted 234 elements from new-approaches-and-procedures-for-cancer-treatment.pdf
Saved JSON for new-approaches-and-procedures-for-cancer-treatment.pdf to /content/processed_json/new-approaches-and-procedures-for-cancer-treatment.json
Processing: /content/files/The_Plan_of_the_Giza_Pyramids.pdf
Extracted 218 elements from The_Plan_of_the_Giza_Pyramids.pdf
Saved JSON for The_Plan_of_the_Giza_Pyramids.pdf to /content/processed_json/The_Plan_of_the_Giza_Pyramids.json
Processing: /content/files/The-Alchemist.pdf
Extracted 1435 elements from The-Alchemist.pdf
Saved JSON for The-Alchemist.pdf to /content/processed_json/The-Alchemist.json
Processing: /content/files/Stats.docx
Extracted 46 elements from Stats.docx
Saved JSON for Stats.docx to /content/processed_json/Stats.json
Processing: /content/files/Ocean_ecogeochemistry_A_review.pdf
Extracted 1235 elements from Ocean_ecogeochemistry_A_review.pdf
Saved JSON 

In [None]:
# Zipping the directories to use them later without processing again
!zip -r /content/chunked_data.zip /content/chunked_data
!zip -r /content/processed_json.zip /content/processed_json

  adding: content/chunked_data/ (stored 0%)
  adding: content/chunked_data/The-Alchemist_chunks.json (deflated 71%)
  adding: content/chunked_data/Ocean_ecogeochemistry_A_review_chunks.json (deflated 74%)
  adding: content/chunked_data/Stats_chunks.json (deflated 72%)
  adding: content/chunked_data/new-approaches-and-procedures-for-cancer-treatment_chunks.json (deflated 68%)
  adding: content/chunked_data/all_chunks.json (deflated 73%)
  adding: content/chunked_data/The_Plan_of_the_Giza_Pyramids_chunks.json (deflated 72%)
  adding: content/chunked_data/M.Sc. Applied Psychology_chunks.json (deflated 76%)
  adding: content/chunked_data/Dataset summaries and citations_chunks.json (deflated 84%)
  adding: content/processed_json/ (stored 0%)
  adding: content/processed_json/Dataset summaries and citations.json (deflated 86%)
  adding: content/processed_json/M.Sc. Applied Psychology.json (deflated 89%)
  adding: content/processed_json/The-Alchemist.json (deflated 86%)
  adding: content/proce