In [None]:
%pip install azure-storage-blob

%pip install pdfplumber

!python -m spacy download en_core_web_sm

%pip install prettytable

%pip install tiktoken

In [None]:
dbutils.library.restartPython()

File paths here

In [None]:
from azure.storage.blob import BlobServiceClient
from pyspark.sql.functions import *

In [None]:
'''USEFUL LINKS
https://www.babelstreet.com/blog/what-is-entity-extraction#:~:text=Entity%20extraction%20(aka%2C%20named%20entity,%2C%20webpages%2C%20text%20fields).

https://medium.com/@sanskrutikhedkar09/mastering-information-extraction-from-unstructured-text-a-deep-dive-into-named-entity-recognition-4aa2f664a453

https://www.microfocus.com/documentation/relativity/relativity1217/reldbdsn/GUID-7C2DF185-41A1-4448-81E7-3252AA8DEBB3.html 

'''

import spacy
from spacy.pipeline import EntityRuler
import re
from typing import List, Dict, Any

def create_nlp_pipeline():
    nlp = spacy.load("en_core_web_sm")
    ruler = nlp.add_pipe("entity_ruler", before="ner")
    
    patterns = [
        {"label": "TECH_DOC", "pattern": [
        {"LOWER": {"IN": ["mce", "mch", "mcx", "mcg", "tr"]}},
        {"TEXT": {"REGEX": r"^\d{4}$"}},
        {"LOWER": {"IN": [":"]}},  # Must have colon immediately after
    ]},
    # Pattern 1: Connected with optional letter (mce0107b or mce0107)
    {"label": "TECH_DOC", "pattern": [
        {"LOWER": {"REGEX": r"^(mce|mch|mcx|mcg|tr|mcs|oa|og|os|pa|pt|pl|re|rg|se|trg|trh)\d{4}[a-z]?$"}}
    ]},

    # Pattern 2: Space after prefix (MCE 0107B or MCE 0107)
    {"label": "TECH_DOC", "pattern": [
        # Match the prefix more flexibly
        {"LOWER": {"IN": ["mce", "mch", "mcx", "mcg", "tr", "mcs", "oa", "og", "os", "pa", "pt", "pl", "re", "rg", "se", "trg", "trh"]}},
        # Match any numbers with optional suffix, removing strict boundaries
        {"TEXT": {"REGEX": r"^\d{4}[A-Za-z]?$"}}
    ]},

    # Pattern 3: Prefix, number, and separate letter (mce 0107 b)
    {"label": "TECH_DOC", "pattern": [
        {"LOWER": {"IN": ["mce", "mch", "mcx", "mcg", "tr", "mcs", "oa", "og", "os", "pa", "pt", "pl", "re", "rg", "se", "trg", "trh"]}},
        {"TEXT": {"REGEX": r"^\d{4}$"}},
        {"LOWER": {"REGEX": r"^[a-z]$"}},
        
    ]},

            {"label": "TECH_DOC", "pattern": [
        {"LOWER": {"IN": ["mce", "mch", "mcx", "mcg", "tr", "mcs", "oa", "og", "os", "pa", "pt", "pl", "re", "rg", "se", "trg", "trh"]}},
        {"TEXT": {"REGEX": r"^\d{4}(?=\\)"}}  # Matches only the 4 digits when followed by backslash
    ]},

    
    # SYSTEM_COMPONENT Patterns
    {"label": "SYSTEM_COMPONENT", "pattern": [
        {"LOWER": {"REGEX": r"^(midas|nmcs2?|hadecs|hatms)$"}}
    ]},
    
    {"label": "SYSTEM_COMPONENT", "pattern": [
        {"LOWER": "midas"}, 
        {"LOWER": "gold"}
    ]},
    
    # HARDWARE_COMPONENT Patterns
    {"label": "HARDWARE_COMPONENT", "pattern": [
        {"LOWER": {"REGEX": r"^(cabinet|plinth|lantern|post|frame|skirt)$"}}, 
        {"LOWER": "type"}, 
        {"TEXT": {"REGEX": r"^\d+[a-z]?$"}}
    ]},
    
    {"label": "HARDWARE_COMPONENT", "pattern": [
        {"LOWER": {"REGEX": r"^(ms[1-4]r?|ami|ert)$"}}
    ]},
    
    {"label": "HARDWARE_COMPONENT", "pattern": [
        {"LOWER": {"IN": ["indicator", "signal", "sensor", "detector", "camera", "telephone"]}}
    ]},
    
    # COMMUNICATION_COMPONENT Patterns
    {"label": "COMMUNICATION_COMPONENT", "pattern": [
        {"LOWER": {"REGEX": r"^(rs485|rs422|tcp\/ip|lan|wan)$"}}
    ]},
    
    {"label": "COMMUNICATION_COMPONENT", "pattern": [
        {"LOWER": "rs"}, 
        {"TEXT": {"REGEX": r"^(485|422)$"}}
    ]},
    
    {"label": "COMMUNICATION_COMPONENT", "pattern": [
        {"LOWER": "ethernet"}, 
        {"LOWER": {"IN": ["lan", "connection", "interface"]}}
    ]},
    
    # SUBSYSTEM_COMPONENT Patterns
    {"label": "SUBSYSTEM_COMPONENT", "pattern": [
        {"LOWER": {"IN": ["signal", "message", "meteorological", "tidal", "tunnel"]}}, 
        {"LOWER": "subsystem"}
    ]},
    
    {"label": "SUBSYSTEM_COMPONENT", "pattern": [
        {"LOWER": {"REGEX": r"^(lcc|pdu|cobs|ceclb|ceceb|cecr)$"}}
    ]},
    
    # CONTROL_COMPONENT Patterns
    {"label": "CONTROL_COMPONENT", "pattern": [
        {"LOWER": {"IN": ["control", "monitoring", "outstation", "instation"]}}, 
        {"LOWER": {"IN": ["system", "unit", "equipment", "interface"]}}
    ]},
    
    {"label": "CONTROL_COMPONENT", "pattern": [
        {"LOWER": "cctv"}, 
        {"LOWER": {"IN": ["system", "camera", "equipment"]}}
    ]},
    
    # SPECIFICATION_TYPE Patterns
    {"label": "SPECIFICATION_TYPE", "pattern": [
        {"LOWER": {"IN": ["requirements", "specification", "instructions", "overview", "process"]}}, 
        {"LOWER": "document"}
    ]},
    
    {"label": "SPECIFICATION_TYPE", "pattern": [
        {"LOWER": "technical"}, 
        {"LOWER": "requirements"}
    ]}
]
    
    ruler.add_patterns(patterns)
    return nlp

In [None]:
import pdfplumber
from io import BytesIO

def extract_text(binary_data):
  pdf_file = BytesIO(binary_data)

  with pdfplumber.open(pdf_file) as pdf:
    plain_text = ""

    for page in pdf.pages:
      plain_text += page.extract_text()

    return plain_text


In [None]:
binary_pdf = pdf_df.select("content").collect()[0]["content"] 

extracted_text = extract_text(binary_pdf) 

In [None]:
nlp = create_nlp_pipeline()

In [None]:
def structure_aware_chunks(text: str, max_tokens: int = 300) -> List[str]:
    """
    Split text into chunks while preserving document structure and page breaks.
    Specifically handles page indicators that contain document references.
    
    Args:
        text: Input text to split
        max_tokens: Approximate maximum tokens per chunk
    
    Returns:
        List of text chunks that respect document structure
    """
    import tiktoken
    import re
    
    # Initialize tokenizer
    encoding = tiktoken.get_encoding("cl100k_base")
    
    # Define pattern for section headers and page indicators
    header_patterns = [
        r'^#+\s+.+$',                      # Markdown headers
        r'^[A-Z0-9][.)\s]+[A-Z].*$',       # Numbered sections like "1. INTRODUCTION"
        r'^[IVXLCDMivxlcdm]+\.\s+.+$',     # Roman numeral sections
        r'^Section\s+\d+[.:]\s+.+$',       # Explicit section markers
        r'^\d+\.\d+\s+.+$'                 # Decimal numbering like "1.2 Configuration"
    ]
    
    # Pattern for page indicators that might contain document references
    page_patterns = [
        r'.*page\s+\d+.*',                 # "page X" indicators
        r'.*\b(mce|mch|mcx|mcg|tr)\s*\d{4}[a-z]?\b.*\bpage\b', # Doc ref + page
        r'.*\d+\s*of\s*\d+\s*$'            # "X of Y" page counters
    ]
    
    # Identify structural elements
    lines = text.split('\n')
    structure = []
    current_section = []
    current_tokens = 0
    
    for i, line in enumerate(lines):
        if not line.strip():  # Empty line
            if current_section:
                current_section.append(line)
            continue
        
        # Check if this is a page indicator
        is_page_indicator = any(re.match(pattern, line.lower(), re.MULTILINE) for pattern in page_patterns)
        
        # If it's a page indicator, add a space before and after
        if is_page_indicator:
            # If not the first line, add space before
            if i > 0 and current_section:
                if not current_section[-1].endswith(' ') and not current_section[-1].endswith('\n'):
                    current_section[-1] += ' '
            
            # Add the page indicator with space after
            current_section.append(line + ' ')
            current_tokens += len(encoding.encode(line + ' '))
            
            # Force a section break after page indicators
            if current_section:
                structure.append('\n'.join(current_section))
                current_section = []
                current_tokens = 0
            continue
            
        line_tokens = len(encoding.encode(line))
        
        # Check if this is a header
        is_header = any(re.match(pattern, line.strip(), re.MULTILINE) for pattern in header_patterns)
        
        # If header or we'd exceed token limit, start new section
        if is_header or (current_tokens + line_tokens > max_tokens and current_section):
            if current_section:
                structure.append('\n'.join(current_section))
                current_section = []
                current_tokens = 0
        
        # Add line to current section
        current_section.append(line)
        current_tokens += line_tokens
        
        # If we're at token limit, break section
        if current_tokens >= max_tokens:
            structure.append('\n'.join(current_section))
            current_section = []
            current_tokens = 0
    
    # Add any remaining content
    if current_section:
        structure.append('\n'.join(current_section))
    
    # Merge small chunks but respect page breaks
    merged_structure = []
    current_chunk = ""
    current_chunk_tokens = 0
    
    for section in structure:
        # Check if this section contains a page indicator
        contains_page_indicator = any(re.search(pattern, section.lower()) for pattern in page_patterns)
        
        section_tokens = len(encoding.encode(section))
        
        # Don't merge if this contains a page indicator or previous chunk ends with one
        if contains_page_indicator or any(re.search(pattern, current_chunk.lower()) for pattern in page_patterns):
            # Add previous chunk if it exists
            if current_chunk:
                merged_structure.append(current_chunk)
            
            # Start new chunk with this section
            current_chunk = section
            current_chunk_tokens = section_tokens
        # Otherwise check if it can be merged
        elif section_tokens < max_tokens * 0.25 and current_chunk_tokens + section_tokens <= max_tokens:
            current_chunk += "\n\n" + section if current_chunk else section
            current_chunk_tokens += section_tokens
        else:
            # Add previous chunk if it exists
            if current_chunk:
                merged_structure.append(current_chunk)
            
            # Start new chunk with this section
            current_chunk = section
            current_chunk_tokens = section_tokens
    
    # Add final chunk
    if current_chunk:
        merged_structure.append(current_chunk)
    
    return merged_structure

In [None]:
def semantic_cluster_chunks(text: str, max_tokens: int = 300, similarity_threshold: float = 0.7) -> List[str]:
    """
    Split text into chunks based on semantic similarity, keeping related content together.
    
    Args:
        text: Input text to split (must be a string, not a list)
        max_tokens: Maximum tokens per chunk
        similarity_threshold: Threshold for considering paragraphs semantically related
        
    Returns:
        List of semantically coherent text chunks
    """
    try:
        import tiktoken
        import numpy as np
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.metrics.pairwise import cosine_similarity
        import re
        
        # Ensure input is a string
        if isinstance(text, list):
            text = " ".join(text)
            
        # Initialize tokenizer
        encoding = tiktoken.get_encoding("cl100k_base")
        
        # Split text into paragraphs
        paragraphs = [p for p in re.split(r'\n\s*\n', text) if p.strip()]
        
        # Handle empty input
        if not paragraphs:
            return []
            
        # Calculate token counts for each paragraph
        paragraph_tokens = [len(encoding.encode(p)) for p in paragraphs]
        
        # Compute semantic similarity between paragraphs
        vectorizer = TfidfVectorizer(stop_words='english')
        
        # Handle case when there's only one paragraph
        if len(paragraphs) == 1:
            return paragraphs
            
        tfidf_matrix = vectorizer.fit_transform(paragraphs)
        similarity_matrix = cosine_similarity(tfidf_matrix)
        
        # Cluster paragraphs by semantic similarity
        clusters = []
        visited = set()
        
        for i in range(len(paragraphs)):
            if i in visited:
                continue
                
            # Start a new cluster
            cluster = [i]
            visited.add(i)
            
            # Find semantically similar paragraphs
            for j in range(len(paragraphs)):
                if j in visited:
                    continue
                    
                # Check if semantically similar to any paragraph in current cluster
                if any(similarity_matrix[k, j] >= similarity_threshold for k in cluster):
                    cluster.append(j)
                    visited.add(j)
            
            clusters.append(sorted(cluster))
        
        # Create chunks based on clusters while respecting token limits
        chunks = []
        for cluster in clusters:
            cluster_paragraphs = [paragraphs[i] for i in cluster]
            cluster_tokens = [paragraph_tokens[i] for i in cluster]
            
            # If cluster is too big, split it by semantic distance
            if sum(cluster_tokens) > max_tokens:
                # Sort paragraphs by semantic distance to the first paragraph
                first_idx = cluster[0]
                sorted_by_similarity = sorted(
                    cluster,
                    key=lambda idx: -similarity_matrix[first_idx, idx]
                )
                
                current_chunk = []
                current_tokens = 0
                
                for idx in sorted_by_similarity:
                    para = paragraphs[idx]
                    tokens = paragraph_tokens[idx]
                    
                    if current_tokens + tokens > max_tokens and current_chunk:
                        chunks.append("\n\n".join(current_chunk))
                        current_chunk = []
                        current_tokens = 0
                        
                    current_chunk.append(para)
                    current_tokens += tokens
                    
                if current_chunk:
                    chunks.append("\n\n".join(current_chunk))
            else:
                # Add whole cluster as one chunk
                chunks.append("\n\n".join(cluster_paragraphs))
        
        return chunks
    except Exception as e:
        # Return original text as a single chunk on error
        print(f"Semantic chunking failed with error: {e}")
        if isinstance(text, list):
            return text
        else:
            return [text]

In [None]:
def recursive_character_chunks(text: str, max_chunk_size: int = 300) -> List[str]:
    """
    Recursively split text into chunks based on characters without overlap.
    Tries to split at paragraph/sentence boundaries when possible.
    
    Args:
        text: Input text to split
        max_chunk_size: Maximum characters per chunk
    
    Returns:
        List of text chunks
    """
    # Base case: if text fits in a single chunk
    if len(text) <= max_chunk_size:
        return [text]
    
    # Try to find a paragraph break
    last_para = text[:max_chunk_size].rfind("\n\n")
    if last_para != -1 and last_para > max_chunk_size // 2:
        # Split at paragraph
        first_chunk = text[:last_para]
        rest = text[last_para:].lstrip()
    else:
        # Try to find a sentence break
        for sep in [". ", "! ", "? ", ".\n", "!\n", "?\n", "\n"]:
            last_sent = text[:max_chunk_size].rfind(sep)
            if last_sent != -1 and last_sent > max_chunk_size // 2:
                first_chunk = text[:last_sent + 1]  # Include the separator
                rest = text[last_sent + 1:].lstrip()
                break
        else:
            # Last resort: split at word boundary
            last_space = text[:max_chunk_size].rfind(" ")
            if last_space != -1 and last_space > max_chunk_size // 3:
                first_chunk = text[:last_space]
                rest = text[last_space:].lstrip()
            else:
                # No good break point found, just split at max size
                first_chunk = text[:max_chunk_size]
                rest = text[max_chunk_size:]
    
    # Recursively process the rest of the text
    return [first_chunk] + recursive_character_chunks(rest, max_chunk_size)


  

def recursive_token_chunks(text: str, max_tokens: int = 200) -> List[str]:
    import tiktoken
    encoding = tiktoken.get_encoding("cl100k_base")
    
    # Get tokens for the full text
    tokens = encoding.encode(text)
    
    # Base case: if text fits in a single chunk
    if len(tokens) <= max_tokens:
        return [text]
    
    # Get the text for the maximum token size
    potential_chunk_text = encoding.decode(tokens[:max_tokens])
    
    # Try to find a paragraph break
    last_para = potential_chunk_text.rfind("\n\n")
    if last_para != -1 and last_para > len(potential_chunk_text) // 2:
        # Split at paragraph
        first_chunk = potential_chunk_text[:last_para]
        # Count tokens in the first chunk
        first_chunk_tokens = len(encoding.encode(first_chunk))
        # Get the rest of the text
        rest = encoding.decode(tokens[first_chunk_tokens:])
    else:
        # Try to find a sentence break
        for sep in [". ", "! ", "? ", ".\n", "!\n", "?\n", "\n"]:
            last_sent = potential_chunk_text.rfind(sep)
            if last_sent != -1 and last_sent > len(potential_chunk_text) // 2:
                first_chunk = potential_chunk_text[:last_sent + len(sep)]
                first_chunk_tokens = len(encoding.encode(first_chunk))
                rest = encoding.decode(tokens[first_chunk_tokens:])
                break
        else:
            # Last resort: split at word boundary
            last_space = potential_chunk_text.rfind(" ")
            if last_space != -1 and last_space > len(potential_chunk_text) // 3:
                first_chunk = potential_chunk_text[:last_space]
                first_chunk_tokens = len(encoding.encode(first_chunk))
                rest = encoding.decode(tokens[first_chunk_tokens:])
            else:
                # No good break point found, just split at max tokens
                first_chunk = potential_chunk_text
                rest = encoding.decode(tokens[max_tokens:])
    
    # Recursively process the rest of the text
    return [first_chunk] + recursive_token_chunks(rest, max_tokens)

In [None]:
import tiktoken

def token_chunks_cl100k(text: str, tokens_per_chunk: int = 200) -> List[str]:        
  encoding = tiktoken.get_encoding("cl100k_base")       
  tokens = encoding.encode(text)
  chunks = []
  for i in range(0, len(tokens), tokens_per_chunk):
    chunk_tokens = tokens[i:i + tokens_per_chunk]
    chunk_text = encoding.decode(chunk_tokens)
    chunks.append(chunk_text)
        
  return chunks


In [None]:
import re
from langchain.text_splitter import CharacterTextSplitter



def get_text_chunks(text):
  text_splitter = CharacterTextSplitter(
    chunk_size=1000, 
    chunk_overlap=0, 
    separator="\n", 
    length_function=len
    )
  chunks = text_splitter.split_text(text)
  return chunks


def get_text_chunks2(text):
    chunks = [line.strip() for line in text.split('\n') if line.strip()]
    return chunks




In [None]:

def clean_text(text):
  if isinstance(text, list):
        text = " ".join(text)
  cleaned_text = " ".join(text.split())
  cleaned_text = cleaned_text.lower()
  return cleaned_text

def lower_text(text):
  lowertext = text.lower()
  return lowertext



In [None]:
def process_pdf_text(extracted_text: str) -> List[str]:
    chunks = get_text_chunks(extracted_text)
    cleaned_chunks = []
    for chunk in chunks:
        cleaned_chunk = clean_text(chunk)
        cleaned_chunks.append(cleaned_chunk)
        
    return cleaned_chunks

In [None]:


main_text = process_pdf_text(extracted_text)

In [None]:
print("Number of chunks:", len(main_text))
for i, chunk in enumerate(main_text, 1):
    print(f"\nCHUNK {i}:")
    print("-" * 50)
    print(chunk)
    print("-" * 50)

In [None]:
# Test the cleaning function
test_text = """
The TR 2144       M:3952 document specifies requirements.
Also see MCE 1234:123 and TR 2144 M for details!.
Some MCE1234B document and TR 2144M reference.
"""

print("Original text:")
print(test_text)
print("\nCleaned text:")
print(clean_text(test_text))

In [None]:

def debug_document_code(nlp, text):
    """
    Provides detailed analysis of how document codes are being processed.
    """
    print(f"\nAnalyzing document code: '{text}'")
    
    # shows raw tokenization
    doc = nlp(text)
    print("\nTokenization details:")
    for token in doc:
        print(f"Token: '{token.text}'")
        print(f"  Position: {token.idx} to {token.idx + len(token.text)}")
        print(f"  Is part of entity: {token.ent_type_ != ''}")
        print(f"  Entity type: {token.ent_type_ if token.ent_type_ else 'None'}")
        print()
    
    # complete entities found
    print("\nComplete entities found:")
    for ent in doc.ents:
        print(f"Entity: '{ent.text}'")
        print(f"  Label: {ent.label_}")
        print(f"  Includes all tokens: {all(t.ent_type_ == ent.label_ for t in ent)}")
        print()
    
    # what didn't match
    unmatched = [t.text for t in doc if not t.ent_type_]
    if unmatched:
        print("\nUnmatched tokens:")
        print(", ".join(unmatched))
     

In [None]:
small_chunk = """
re 1110 The MCE0107B document connects to RS485 while MCH 1070B uses RS 422.
MIDAS Gold system interfaces with the Ethernet LAN through Cabinet Type 600.
The Signal Subsystem monitors the CCTV System and AMI-EE devices. (AMI bobo)
"""

medium_chunk = """
The assembly manual for MCH0107B specifies that RS485 components must be configured 
alongside RS422 adapters, with additional references to OA 0150C outlined in 
Section 4.3 of the document. TR 2043 further details the integration with 
Ethernet LAN systems, enabling high-speed communication protocols compliant 
with IEEE 802.3 standards. The MIDAS Gold system connects through Cabinet Type 600A 
to the Control System, while monitoring occurs via the CCTV System and Signal Subsystem.

The MCE 1070B requirements document describes interfacing with NMCS2 through standard 
protocols. Local connections use RS 485 for primary communication, supported by 
AMI-EE devices and monitored by the Outstation Equipment.
"""

large_chunk = """
Technical Requirements Document: System Integration Specification

1. Overview
The MCE0107B specification, in conjunction with MCH 1070B and TR 2043, defines the 
integration requirements for the MIDAS Gold system. Primary communication occurs through 
RS485 interfaces, while secondary protocols utilize RS 422 and Ethernet LAN connections.

2. Hardware Components
Cabinet Type 600 houses the main control units, with additional Cabinet Type 450A units 
for auxiliary systems. The AMI-EE devices interface with MS3R indicators and standard 
AMI units. Signal sensors and detector units provide environmental monitoring capabilities.

3. System Architecture
The NMCS2 framework integrates with HADECS and HATMS subsystems through standardized 
interfaces. The Signal Subsystem and Message Subsystem handle primary control operations, 
while the Meteorological Subsystem provides environmental data. CECLB and CECEB units 
coordinate with the PDU for power distribution.

4. Communication Infrastructure
Primary TCP/IP networks connect through LAN and WAN interfaces. The Ethernet LAN provides 
local connectivity, supported by RS485 and RS 422 serial connections. Each Control System 
interfaces with its respective Monitoring Unit through dedicated channels.

5. Monitoring and Control
The CCTV System provides visual monitoring capabilities, integrated with the Control System 
and Monitoring Equipment. Outstation Equipment handles remote operations, while the 
Instation Interface manages central control functions.

6. Reference Documentation
MCE 1080B describes the detailed protocols, while TR 2044 and MCH 1075B provide 
supplementary specifications. The Requirements Document and Technical Requirements 
specify additional integration parameters.

7. System Components
Multiple AMI-EE installations connect through Cabinet Type 600B units, monitored by 
the Signal Subsystem. The MIDAS Gold deployment utilizes standard NMCS2 protocols for 
primary operations.
"""



In [None]:

custom_labels = ['TECH_DOC', 'SYSTEM_COMPONENT', 'HARDWARE_COMPONENT', 'COMMUNICATION_COMPONENT', 'SUBSYSTEM_COMPONENT', 'CONTROL_COMPONENT', 'SPECIFICATION_TYPE']


def process_text(nlp, text: str):
    """
    Process text and return detailed entity information including:
    - Individual entity frequencies
    - Entity type counts
    - Context and position information
    
    This enhanced tracking helps build a more informed knowledge graph by showing
    which specific entities are most referenced in the documentation.
    """

    doc = nlp(text)
    
    # tracking dictionaries
    entity_type_counts = {label: 0 for label in custom_labels}  # Counts by entity type
    entity_frequencies = {}  # Counts of specific entity mentions
    
    entities = []
    for ent in doc.ents:
        if ent.label_ in custom_labels:
            # Create entity record
            entity_info = {
                'text': ent.text,
                'label': ent.label_,
                'original_text': text[ent.start_char:ent.end_char],
                'start': ent.start_char,
                'end': ent.end_char
            }
            entities.append(entity_info)
            
            entity_type_counts[ent.label_] += 1
            
            # Update specific entity frequency
            entity_key = (ent.text, ent.label_)  # Tuple of text and label to handle same text with different labels
            if entity_key not in entity_frequencies:
                entity_frequencies[entity_key] = {
                    'count': 0,
                    'text': ent.text,
                    'label': ent.label_
                }
            entity_frequencies[entity_key]['count'] += 1
    
    return entities, entity_type_counts, entity_frequencies

def print_document_results(entities, type_counts, frequencies):
    """
    Display comprehensive entity analysis including:
    - Individual entities found
    - Counts by entity type
    - Frequency of specific entities
    """
    print("\nDocument Processing Results:")
    print("-" * 50)
    
    print("Entities Found in Context:")
    for entity in entities:
        print(f"Found: {entity['text']} ({entity['label']})")
        print(f"Original text: '{entity['original_text']}'")
        print(f"Position: {entity['start']} to {entity['end']}")
        print("-" * 30)
    
    print("\nEntity Type Summary:")
    print("-" * 50)
    for label, count in type_counts.items():
        if count > 0:  
            print(f"{label}: {count} total mentions")
    
    print("\nDetailed Entity Frequencies:")
    print("-" * 50)
    
    # Group frequencies by entity type for clearer presentation
    grouped_frequencies = {}
    for (text, label), info in frequencies.items():
        if label not in grouped_frequencies:
            grouped_frequencies[label] = []
        grouped_frequencies[label].append(info)
    
    # Print frequencies by type
    for label in custom_labels:
        if label in grouped_frequencies:
            print(f"\n{label}:")
            # Sort by frequency, highest first
            sorted_entities = sorted(grouped_frequencies[label], 
                                  key=lambda x: x['count'], 
                                  reverse=True)
            for entity in sorted_entities:
                print(f"  {entity['text']}: {entity['count']} mentions")

    


In [None]:

# Initialize the NLP pipeline
#nlp = create_nlp_pipeline()

debug_document_code(nlp, "MCE0107B , MCE2344B x  TR 0543 C more, MCE 2343: bro nn, MCE 3342C cap, MCE 3333 B bag, MCE3234 A stir tr 2144 m:3952 test this he Lane SAC Priority Data section contains the priority table for lane SAC settings (see TR 2163\I:410). ") 

    
   

In [None]:
s_chunk = str(small_chunk)
 
entities, counts, freq = process_text(nlp, s_chunk)
print_document_results(entities, counts, freq)

debug_document_code(nlp, s_chunk)

In [None]:
 
big_chunk = str(main_text)
 
entities, counts, freq = process_text(nlp, big_chunk)
print_document_results(entities, counts, freq)

In [None]:
#print(big_chunk)

In [None]:

manual_counts1 = {'TR 1100': 10, 'TR 2070': 8, 'TR 2142': 4, 'TR 2043': 3, 'TR 2067': 4, 'TR 2130': 3, 'TR2070': 3, 'TR 2072': 2, 'TR2139': 1, 'MCE 1349': 3, 'MCE0110': 1, 'MCE0107': 3, 'MCH 1618': 2, 'MCX 0731': 1, 'MCX 0925': 1, 'MCX 0910': 1, 'TRH 1679':1, 'TRH 1680':4, 'TRG 0500':1}


manual_counts2 = {
        'TR 2033': 5,'TR 2043': 14,'TR 1100': 12,'TR 2070': 6,'TR 2130': 5,'TR 2142': 5,'TR 2067': 3,'TR2070': 4,'TR 2072': 2,'TR2139': 1,'TR 1173': 1,'TR 1238': 1,'TR 2110': 1,'MCX1031':17,'MCX0920': 3,'MCX0918': 1,'MCX0733': 1,'MCH 1618': 2,'MCH 1689': 2,'MCH1349': 1,'MCH 1621': 1,'MCE 0110': 1,'MCE0110': 2,'MCE0107': 1,'MCG 1069': 1,'TRH 1679':2, 'TRG 0500':2}

manual_counts3 = {'mce 1157': 2, 'mce 1157 a': 1, 'mce 1157 b': 1, 'mce 1157 c': 1, 'mce 1157 d': 1, 'mce 1157 e': 1, 'tr 1100': 7, 'mcx 0708': 4, 'tr 2130': 4, 'mch 1616': 3, 'mch 1618': 3, 'tr 2199': 3, 'mcg 1107': 2, 'mch 1349': 2, 'tr2199': 1}

manual_counts4 = {'mch 1744': 23, 'tr 2144': 11, 'mcg 1091': 3, 'tr2144': 3, 'mcg 1069': 3, 'mcg 1092': 2, 'mcg 1093': 1, 'mch 1714': 1, 'mch 1753': 1, 'mch 1748': 1}

manual_counts5 = {'tr 2199': 138, 'tr 2130': 16, 'tr 1100': 14, 'tr 2067': 13, 'tr 2070': 10, 'mcg 1069': 5, 'tr 2516': 5, 'mce 1137': 4, 'mch 1689': 3, 'mcx 0028': 3, 'tr 2195': 2, 'mcx 0071': 2, 'tr 2045': 2, 'mcg 1107': 1, 'mch 1616': 1}

manual_counts6 = {'mcg 1069': 3, 'mce 0110': 3, 'tr 1100': 2, 'tr 2199': 2, 'mce 0107': 2, 'tr 2195': 2, 'mce 2214': 2, 'mch 1616': 2, 'mcg 1202': 1}

manual_counts7 = {'mch 1753': 13, 'tr 2144': 4, 'mch 1744': 1}

manual_counts8 = {'mch 1748': 39, 'tr 2163': 23, 'tr 2133': 5, 'mch 1726': 4, 'tr 2139': 3, 'mch 1689': 3, 'mch 1617': 2, 'mch 1618': 2, 'mch 1655': 2, 'tr 2072': 1, 'mch 1700': 2, 'mch 1759': 1, 'mch1619': 1, 'mch 1124': 1}

manual_counts9 = {'mch 1748': 38, 'mch 1689': 14, 'mce 2103': 4, 'mch 1700': 4, 'mch 1616': 3, 'tr 2072': 3, 'tr 2133': 2, 'mch 1798': 1, 'mch 1619': 1}

manual_counts10 = {'mch2624': 4, 'mch1689': 2, 'mch 2629': 1}

manual_counts11 = {"mce 2240": 14, "mcg 1110": 8, "mce 2536": 5, "tr 1100": 7, "tr 2130": 4, "mce 1126": 4, "mce 2135": 3, "tr 2180": 3, "mce 1233": 3, "mce 0110": 2, "mch 1514": 2, "tr 2189": 3, "mch 1619": 1}

manual_counts12 = {"mce 2242": 8, "mce 2245": 6, "mce 2240": 4, "mce2247": 4, "mce 2247": 3, "mce 2239": 1, "mce 2241": 1, "mce 2135": 1, "mce 2013": 1, "mce 2216": 1, "mce 1959": 1, "mch 1959": 4, "mch 1960": 1, "mch 1970": 1, "mcg 1110": 1}

manual_counts13 = {"tr 2145": 18, "tr 2172": 9, "tr 2173": 8, "tr 1100 c": 1, "tr 2145 d": 1, "tr 2172 d": 1, "tr 2173 j": 1, "re 2177 g": 1}

manual_counts14 = {"mch 1700": 18, "mch 1596": 2, "mcg 1069": 7, "mce 2103": 5, "mcg 1075": 3, "mcg 1086": 3, "mcg 1091": 3, "mcg 1077": 2}

manual_counts18 = {"mch 1781": 2, "mce2242": 3, "mcg 1110": 1, "mch 1960": 1, "mch 1970": 1, "mch 1959": 1, "mch 1731": 1, "mce 2239": 1, "mce 2240": 1, "mce 2241": 1, "mce 2216": 1, "mce 2247": 1, "mce 2242": 1, "mce 2246": 1}


manual_counts19={'mch 1760':9, 'mch1865':6, 'mch 1952':3, 'mch 1951':3, 'mch 1957':3, 'mch 1696':2, 'mch 1867':2, 'mch 1857':1,}

manual_counts20 = {"mch 2470": 6, "mch1965": 5, "mch 2474": 4, "mch 2472": 3, "mch 2473": 3, "mch1349": 3, "mch2471": 2, "mch1514": 2, "mch1144": 2, "mch1147": 2, "mch1148": 2, "mch 1965": 1, "mch2472": 1, "mch 2471": 1}




In [None]:
copilot_count1 = {'MCE0110': 1,'MCH1618': 2,'MCE0107': 3,'TR2043': 3,'TR2067': 4,'TR2070': 11,'TR2072': 2,'TR1100': 9,'MCE1349': 3,'TR2130': 3,'MCX0731': 1,'MCX0925': 1,'TR2033': 2,'MCX0910': 1,'TR2142': 4,'MCG1069': 1,'TR2139': 1
}

copilot_count2 = {'MCE0107': 1, 'MCE0110': 3, 'MCH1618': 2, 'MCX1031': 15, 'TR2043': 13, 'MCH1689': 2, 'TR2070': 9, 'MCX0920': 3, 'TR1100': 12, 'MCH1349': 1, 'TR2130': 4, 'TR2067': 3, 'MCX0918': 1, 'TR2033': 4, 'MCX0733': 1, 'TR2142': 5, 'MCG1069': 1, 'TR2072': 2, 'TR2139': 1, 'MCH1621': 1, 'TR1173': 1, 'TR1238': 1, 'TR2110': 1, 'MCE 0107 B': 1, 'MCX 1031': 2}

copilot_count3 = {'MCE1157': 7, 'MCH1616': 3, 'MCH1618': 3, 'TR2199': 4, 'MCG1107': 2, 'TR1100': 7, 'MCX0708': 4, 'MCH1349': 2, 'TR2130': 4}

copilot_count4 = {'MCG1093': 1, 'TR2144': 14, 'MCH1744': 23, 'MCG1069': 3, 'MCG1091': 3, 'MCG1092': 2, 'MCH1714': 1, 'MCH1753': 1, 'MCH1748': 1}

copilot_count6 = {'MCG1202': 1, 'TR1100': 2, 'MCG1069': 3, 'TR2199': 2, 'MCE0107': 2, 'TR2195': 2, 'MCE2214': 2, 'MCE0110': 3, 'MCH1616': 2}

copilot_count7 = {'MCH1744': 1, 'TR2144': 4, 'MCH1753': 11}

copilot_count8= {'MCH1759': 1, 'MCH1726': 4, 'MCH1617': 2, 'MCH1618': 2, 'MCH1655': 2, 'TR2139': 3, 'TR2163': 23, 'MCH1748': 39, 'MCH1689': 3, 'TR2133': 5, 'MCH1700': 2, 'TR2072': 1, 'MCH1619': 1, 'MCH1124': 1}

copilot_count9 = {'MCH1798': 1, 'MCH1616': 3, 'MCH1748': 38, 'MCE2103': 4, 'MCH1689': 14, 'TR2072': 3, 'MCH1700': 4, 'TR2133': 1, 'MCH1619': 1}

copilot_count10 = {'MCH2629': 1, 'MCH2624': 4, 'MCH1689': 2}


In [None]:
def get_accuracies_for_all_pdfs(nlp):
    pdf_accuracies = {}
    
    # List of all files and their corresponding manual counts
    files_man = [
        (file1, manual_counts1, "MCE0110B"),
        (file2, manual_counts2, "MCE0107B"),
        (file3, manual_counts3, "MCE1157E"),
        (file4, manual_counts4, "MCG1093J"),
        (file5, manual_counts5, "MCG1107B"),
        (file6, manual_counts6, "MCG1202A"),
        (file7, manual_counts7, "MCH1744H"),
        (file8, manual_counts8, "MCH1759F"),
        (file9, manual_counts9, "MCH1798H"),
        (file10, manual_counts10, "MCH2629A")
    ]

    files_man2 = [
    (file11, manual_counts11, "MCE2241F"),
    (file12, manual_counts12, "MCE2246A"),
    (file13, manual_counts13, "MCG1090D"),
    (file14, manual_counts14, "MCG1094C"),
    (file18, manual_counts18, "MCH1734A"),
    (file19, manual_counts19, "MCH1948B"),
    (file20, manual_counts20, "MCH2475C")
    ]


    files_copilot = [
        (file1, copilot_count1, "MCE0110B"),
        (file2, copilot_count2, "MCE0107B"),
        (file3, copilot_count3, "MCE1157E"),
        (file4, copilot_count4, "MCG1093J"),
        (file6, copilot_count6, "MCG1202A"),
        (file7, copilot_count7, "MCH1744H"),
        (file8, copilot_count8, "MCH1759F"),
        (file9, copilot_count9, "MCH1798H"),
        (file10, copilot_count10, "MCH2629A")
    ]


    
    for file_path, manual_counts, pdf_name in files_man:
        # Read PDF content
        pdf_df = spark.read.format("binaryFile").load(file_path).cache()
        binary_pdf = pdf_df.select("content").collect()[0]["content"]
        
        # Extract text
        extracted_text = extract_text(binary_pdf)
        
        text_for_validation = process_pdf_text(extracted_text)

        # Run validation
        _, accuracy = validate_tech_doc_recognition(nlp, text_for_validation, manual_counts)
        pdf_accuracies[pdf_name] = accuracy
        
        print(f"{pdf_name} Accuracy: {accuracy:.1f}%")
        
        # Clear cache
        pdf_df.unpersist()
    
    return pdf_accuracies

In [None]:
from prettytable import PrettyTable
from typing import Dict, List, Tuple
from builtins import min, abs


def normalize_tech_doc_id(doc_id: str) -> str:
    """
    Normalizes technical document IDs by removing spaces and converting to lowercase.
    For example: 'MCE 0107 B' -> 'mce0107b'
    """
    return ''.join(doc_id.split()).lower()

def validate_tech_doc_recognition(nlp, text_input, manual_counts: Dict[str, int]) -> Tuple[PrettyTable, float]:
    """
    Validates the NER model's performance on technical document recognition,
    treating different format variations of the same document ID as equivalent.
    
    Args:
        nlp: spaCy NLP model
        text_input: Either a string or list of strings (chunks)
        manual_counts: Dictionary of manual counts for each technical document
    
    Returns:
        PrettyTable showing comparison results
        Overall accuracy percentage
    """
    # Handle different input types
    if isinstance(text_input, list):
        # Process each chunk individually and combine results
        all_entities = []
        
        for chunk in text_input:
            chunk_doc = nlp(chunk)
            for ent in chunk_doc.ents:
                if ent.label_ == "TECH_DOC":
                    all_entities.append(ent)
    else:
        # Process as a single string
        doc = nlp(text_input)
        all_entities = [ent for ent in doc.ents if ent.label_ == "TECH_DOC"]
    
    # Process manual counts & normalize
    normalized_manual_counts = {}
    variations_map = {}
    for original_id, count in manual_counts.items():
        normalized_id = normalize_tech_doc_id(original_id)
        if normalized_id not in normalized_manual_counts:
            normalized_manual_counts[normalized_id] = 0
            variations_map[normalized_id] = set()
        normalized_manual_counts[normalized_id] += count
        variations_map[normalized_id].add(original_id)
    
    # Count model predictions
    predicted_counts = {}
    for ent in all_entities:
        normalized_ent = normalize_tech_doc_id(ent.text)
        if normalized_ent not in predicted_counts:
            predicted_counts[normalized_ent] = 0
        predicted_counts[normalized_ent] += 1
        if normalized_ent in variations_map:
            variations_map[normalized_ent].add(ent.text)
    
    # Create comparison table
    table = PrettyTable()
    table.field_names = [
        "Technical Document",
        "Variations Found",
        "Manual Count",
        "Model Count",
        "Difference",
        "Accuracy %"
    ]
    table.align = "l"
    
    # Track totals
    total_manual = 0
    total_predicted = 0
    total_correct = 0
    
    # Process all manual counts
    processed_ids = set()
    for normalized_id in normalized_manual_counts.keys():
        if normalized_id in processed_ids:
            continue
            
        processed_ids.add(normalized_id)
        
        manual_count = normalized_manual_counts[normalized_id]
        predicted_count = predicted_counts.get(normalized_id, 0)
        
        # Get all variations found
        variations = sorted(variations_map[normalized_id])
        variations_str = ", ".join(variations)
        
        # Calculate accuracy
        accuracy = min(predicted_count, manual_count) / manual_count * 100 if manual_count > 0 else 0
        
        # Update totals
        total_manual += manual_count
        total_predicted += predicted_count
        total_correct += min(predicted_count, manual_count)
        
        # Use the first variation as the primary ID for display
        primary_id = sorted(variations_map[normalized_id])[0]
        
        table.add_row([
            primary_id,
            variations_str,
            manual_count,
            predicted_count,
            predicted_count - manual_count,
            f"{accuracy:.1f}%"
        ])
    
    # Calculate overall accuracy
    overall_accuracy = (total_correct / total_manual * 100) if total_manual > 0 else 0
    
    # Add totals row
    table.add_row([
        "TOTAL",
        "",
        total_manual,
        total_predicted,
        total_predicted - total_manual,
        f"{overall_accuracy:.1f}%"
    ])
    
    return table, overall_accuracy
# Run validation
results_table, overall_accuracy = validate_tech_doc_recognition(nlp, big_chunk, manual_counts1)
print(results_table)

In [None]:
def plot_accuracies(accuracies):
    plt.figure(figsize=(12, 6))
    
    # Create bar plot
    pdfs = list(accuracies.keys())
    acc_values = list(accuracies.values())
    
    bars = plt.bar(pdfs, acc_values, color='skyblue')
    
    # Customize plot
    plt.title('Technical Document Recognition Accuracy Across PDFs', pad=20)
    plt.xlabel('PDF Documents')
    plt.ylabel('Accuracy (%)')
    
    # Add value labels on bars
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height,
                f'{height:.1f}%',
                ha='center', va='bottom')
    
    # Rotate x-axis labels
    plt.xticks(rotation=45)
    
    # Add grid
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    # Set y-axis range
    plt.ylim(0, 100)
    
    # Add average line
    avg_accuracy = np.mean(acc_values)
    plt.axhline(y=avg_accuracy, color='r', linestyle='--', alpha=0.8)
    plt.text(len(pdfs)-1, avg_accuracy, f'Average: {avg_accuracy:.1f}%', 
             va='bottom', ha='right', color='r')
    
    plt.tight_layout()
    plt.show()

In [None]:
import matplotlib.pyplot as plt
def main():

    # Print results
    print("\nTechnical Document Recognition Validation")
    print("=" * 80)
    print(results_table)
    print(f"\nOverall Model Accuracy: {overall_accuracy:.1f}%")

    # Get accuracies for all PDFs
    accuracies = get_accuracies_for_all_pdfs(nlp)
    
    # Plot results
    plot_accuracies(accuracies)
    


if __name__ == "__main__":
    main()

In [None]:
def process_pdf_text_token(extracted_text: str) -> List[str]:
    chunks = token_chunks_cl100k(extracted_text)
    
    cleaned_chunks = []
    for chunk in chunks:
        cleaned_chunk = clean_text(chunk)
        cleaned_chunks.append(cleaned_chunk)
    
    return cleaned_chunks

In [None]:
def process_pdf_text_recursive_char(extracted_text: str) -> List[str]:
    chunks = recursive_character_chunks(extracted_text)
    
    cleaned_chunks = []
    for chunk in chunks:
        cleaned_chunk = clean_text(chunk)
        cleaned_chunks.append(cleaned_chunk)
    
    return cleaned_chunks

In [None]:
def process_pdf_text_recursive_token(extracted_text: str) -> List[str]:
    chunks = recursive_token_chunks(extracted_text)
    
    cleaned_chunks = []
    for chunk in chunks:
        cleaned_chunk = clean_text(chunk)
        cleaned_chunks.append(cleaned_chunk)

In [None]:
def process_pdf_text_line(extracted_text: str) -> List[str]:
    chunks = get_text_chunks2(extracted_text)
    
    cleaned_chunks = []
    for chunk in chunks:
        cleaned_chunk = clean_text(chunk)
        cleaned_chunks.append(cleaned_chunk)
    
    return cleaned_chunks

In [None]:
def process_pdf_text_semantic(extracted_text: str) -> List[str]:
    chunks = semantic_cluster_chunks(extracted_text)
    
    cleaned_chunks = []
    for chunk in chunks:
        cleaned_chunk = clean_text(chunk)
        cleaned_chunks.append(cleaned_chunk)
    
    return cleaned_chunks

In [None]:
def process_pdf_text_structure(extracted_text: str) -> List[str]:
    chunks = structure_aware_chunks(extracted_text)
    
    cleaned_chunks = []
    for chunk in chunks:
        cleaned_chunk = clean_text(chunk)
        cleaned_chunks.append(cleaned_chunk)
    
    return cleaned_chunks

In [None]:
files_man = [
        (file1, manual_counts1, "MCE0110B"),
        (file2, manual_counts2, "MCE0107B"),
        (file3, manual_counts3, "MCE1157E"),
        (file4, manual_counts4, "MCG1093J"),
        (file5, manual_counts5, "MCG1107B"),
        (file6, manual_counts6, "MCG1202A"),
        (file7, manual_counts7, "MCH1744H"),
        (file8, manual_counts8, "MCH1759F"),
        (file9, manual_counts9, "MCH1798H"),
        (file10, manual_counts10, "MCH2629A")
    ]


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from typing import Dict, List, Tuple, Callable

def compare_chunking_techniques(nlp, files_man):
    """
    Compares the accuracy of different chunking techniques across multiple PDF files.
    
    Args:
        nlp: The NLP pipeline with entity recognition
        files_man: List of tuples with (file_path, manual_counts, pdf_name)
        
    Returns:
        Dictionary mapping chunking technique names to their average accuracies
    """
    # Define all chunking techniques to compare
    chunking_techniques = {
        "Standard Character": process_pdf_text,
        "Line-by-Line": process_pdf_text_line,
        "Token-based": process_pdf_text_token,
        "Recursive Character": process_pdf_text_recursive_char,
        "Structure-aware": process_pdf_text_structure,
        "Semantic Clustering": process_pdf_text_semantic
    }
    
    # Store results for each technique
    technique_results = {name: [] for name in chunking_techniques.keys()}
    
    # Process each PDF with each chunking technique
    for file_path, manual_counts, pdf_name in files_man:
        print(f"\nProcessing {pdf_name}...")
        
        # Read PDF content (only once per file)
        pdf_df = spark.read.format("binaryFile").load(file_path).cache()
        binary_pdf = pdf_df.select("content").collect()[0]["content"]
        extracted_text = extract_text(binary_pdf)
        
        # Test each chunking technique
        for technique_name, chunking_function in chunking_techniques.items():
            try:
                print(f"  Applying {technique_name} chunking...")
                # Ensure extracted_text is a string before passing to chunking function
                if isinstance(extracted_text, list):
                    text_to_process = "\n".join(extracted_text)
                else:
                    text_to_process = extracted_text
                chunks = chunking_function(extracted_text)
                
                # Validate recognition accuracy
                _, accuracy = validate_tech_doc_recognition(nlp, chunks, manual_counts)
                technique_results[technique_name].append((pdf_name, accuracy))
            except Exception as e:
                print(f"    Error with {technique_name}: {str(e)}")
                # Record 0 accuracy on error to avoid skipping
                technique_results[technique_name].append((pdf_name, 0.0))
        
        # Clear cache
        pdf_df.unpersist()
    
    # Calculate average accuracies
    average_accuracies = {}
    for technique, results in technique_results.items():
        if results:
            accuracies = [acc for _, acc in results]
            average_accuracies[technique] = np.mean(accuracies)
        else:
            average_accuracies[technique] = 0.0
    
    return technique_results, average_accuracies

def plot_average_accuracies(average_accuracies):
    """
    Creates a bar chart of average accuracies for different chunking techniques.
    
    Args:
        average_accuracies: Dictionary mapping technique names to average accuracies
    """
    plt.figure(figsize=(12, 6))
    
    # Sort techniques by average accuracy
    sorted_techniques = sorted(average_accuracies.items(), key=lambda x: x[1], reverse=True)
    techniques = [t for t, _ in sorted_techniques]
    accuracies = [a for _, a in sorted_techniques]
    
    # Create bar chart
    bars = plt.bar(techniques, accuracies, color='skyblue')
    
    # Add value labels
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height,
                f'{height:.1f}%',
                ha='center', va='bottom')
    
    # Customize chart
    plt.title('Average Accuracy by Chunking Technique', fontsize=15)
    plt.xlabel('Chunking Technique', fontsize=12)
    plt.ylabel('Average Accuracy (%)', fontsize=12)
    plt.ylim(0, 100)
    plt.xticks(rotation=45, ha='right')
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    
    return plt

def plot_technique_comparison_by_file(technique_results):
    """
    Creates a grouped bar chart showing the accuracy of each technique for each PDF file.
    
    Args:
        technique_results: Dictionary mapping technique names to lists of (pdf_name, accuracy) tuples
    """
    # Get unique PDF names
    pdf_names = sorted(set(pdf_name for technique in technique_results.values() 
                         for pdf_name, _ in technique))
    
    # Organize data by PDF
    data_by_pdf = {pdf: {} for pdf in pdf_names}
    for technique, results in technique_results.items():
        for pdf_name, accuracy in results:
            data_by_pdf[pdf_name][technique] = accuracy
    
    # Set up plot
    plt.figure(figsize=(14, 8))
    
    # Get technique names and set bar width
    techniques = list(technique_results.keys())
    num_techniques = len(techniques)
    width = 0.8 / num_techniques
    
    # Create position indices for each PDF
    indices = np.arange(len(pdf_names))
    
    # Plot grouped bars
    for i, technique in enumerate(techniques):
        # Get accuracies for this technique across all PDFs
        accuracies = [data_by_pdf[pdf].get(technique, 0) for pdf in pdf_names]
        
        # Calculate bar positions
        positions = indices + (i - num_techniques/2 + 0.5) * width
        
        # Plot bars for this technique
        plt.bar(positions, accuracies, width, label=technique)
    
    # Customize plot
    plt.xlabel('PDF Document', fontsize=12)
    plt.ylabel('Accuracy (%)', fontsize=12)
    plt.title('Chunking Technique Comparison by PDF Document', fontsize=15)
    plt.xticks(indices, pdf_names, rotation=45, ha='right')
    plt.ylim(0, 100)
    plt.legend(title='Chunking Technique')
    plt.grid(axis='y', linestyle='--', alpha=0.5)
    plt.tight_layout()
    
    return plt

def create_heatmap(technique_results):
    """
    Creates a heatmap showing the accuracy of each technique for each PDF file.
    
    Args:
        technique_results: Dictionary mapping technique names to lists of (pdf_name, accuracy) tuples
    """
    import seaborn as sns
    
    # Get unique PDF names and technique names
    pdf_names = sorted(set(pdf_name for technique in technique_results.values() 
                         for pdf_name, _ in technique))
    techniques = list(technique_results.keys())
    
    # Create data matrix
    data = np.zeros((len(pdf_names), len(techniques)))
    for i, pdf in enumerate(pdf_names):
        for j, technique in enumerate(techniques):
            # Find accuracy for this PDF and technique
            for pdf_name, accuracy in technique_results[technique]:
                if pdf_name == pdf:
                    data[i, j] = accuracy
                    break
    
    # Create heatmap
    plt.figure(figsize=(14, 10))
    sns.heatmap(data, annot=True, fmt=".1f", 
                xticklabels=techniques, 
                yticklabels=pdf_names,
                cmap="YlGnBu", vmin=0, vmax=100)
    
    plt.title('Accuracy Heatmap: Chunking Techniques vs. PDF Documents', fontsize=15)
    plt.xlabel('Chunking Technique', fontsize=12)
    plt.ylabel('PDF Document', fontsize=12)
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    
    return plt

# Main execution function
def evaluate_chunking_techniques(nlp, files_man):
    """
    Main function to evaluate chunking techniques and generate visualizations.
    
    Args:
        nlp: The NLP pipeline with entity recognition
        files_man: List of tuples with (file_path, manual_counts, pdf_name)
    """
    # Compare all techniques
    technique_results, average_accuracies = compare_chunking_techniques(nlp, files_man)
    
    # Generate plots
    avg_plot = plot_average_accuracies(average_accuracies)
    comparison_plot = plot_technique_comparison_by_file(technique_results)
    #heatmap_plot = create_heatmap(technique_results)
    
    # Display plots
    avg_plot.show()
    comparison_plot.show()
    #heatmap_plot.show()
    
    return technique_results, average_accuracies

# Example usage:
evaluate_chunking_techniques(nlp, files_man)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import time
from typing import Dict, List, Tuple, Callable

def compare_chunking_techniques_with_runtime(nlp, files_man):
    """
    Compares the accuracy and execution time of different chunking techniques across multiple PDF files.
    
    Args:
        nlp: The NLP pipeline with entity recognition
        files_man: List of tuples with (file_path, manual_counts, pdf_name)
        
    Returns:
        Dictionary mapping chunking technique names to their average accuracies and runtimes
    """
    # Define all chunking techniques to compare
    chunking_techniques = {
        "Standard Character": process_pdf_text,
        "Line-by-Line": process_pdf_text_line,
        "Token-based": process_pdf_text_token,
        "Recursive Character": process_pdf_text_recursive_char,
        "Structure-aware": process_pdf_text_structure,
        "Semantic Clustering": process_pdf_text_semantic
    }
    
    # Store results for each technique
    technique_results = {name: [] for name in chunking_techniques.keys()}
    technique_runtimes = {name: [] for name in chunking_techniques.keys()}
    
    # Process each PDF with each chunking technique
    for file_path, manual_counts, pdf_name in files_man:
        print(f"\nProcessing {pdf_name}...")
        
        # Read PDF content (only once per file)
        pdf_df = spark.read.format("binaryFile").load(file_path).cache()
        binary_pdf = pdf_df.select("content").collect()[0]["content"]
        extracted_text = extract_text(binary_pdf)
        
        # Test each chunking technique
        for technique_name, chunking_function in chunking_techniques.items():
            try:
                print(f"  Applying {technique_name} chunking...")
                
                # Ensure text is in the right format
                if isinstance(extracted_text, list):
                    text_to_process = "\n".join(extracted_text)
                else:
                    text_to_process = extracted_text
                
                # Measure execution time
                start_time = time.time()
                chunks = chunking_function(text_to_process)
                chunking_time = time.time() - start_time
                
                # Measure NER execution time
                start_time = time.time()
                _, accuracy = validate_tech_doc_recognition(nlp, chunks, manual_counts)
                ner_time = time.time() - start_time
                
                # Total processing time
                total_time = chunking_time + ner_time
                
                # Store results
                technique_results[technique_name].append((pdf_name, accuracy))
                technique_runtimes[technique_name].append((pdf_name, total_time, chunking_time, ner_time))
                
                print(f"    Accuracy: {accuracy:.1f}%")
                print(f"    Runtime: {total_time:.2f}s (Chunking: {chunking_time:.2f}s, NER: {ner_time:.2f}s)")
                
            except Exception as e:
                print(f"    Error with {technique_name}: {str(e)}")
                # Record 0 accuracy and -1 runtime on error to indicate failure
                technique_results[technique_name].append((pdf_name, 0.0))
                technique_runtimes[technique_name].append((pdf_name, -1, -1, -1))
        
        # Clear cache
        pdf_df.unpersist()
    
    # Calculate average accuracies and runtimes
    average_accuracies = {}
    average_runtimes = {}
    
    for technique, results in technique_results.items():
        if results:
            accuracies = [acc for _, acc in results]
            average_accuracies[technique] = np.mean(accuracies)
        else:
            average_accuracies[technique] = 0.0
    
    for technique, times in technique_runtimes.items():
        if times:
            # Filter out error cases (-1)
            valid_times = [(pdf, t, c, n) for pdf, t, c, n in times if t >= 0]
            if valid_times:
                total_times = [t for _, t, _, _ in valid_times]
                chunking_times = [c for _, _, c, _ in valid_times]
                ner_times = [n for _, _, _, n in valid_times]
                
                average_runtimes[technique] = {
                    'total': np.mean(total_times),
                    'chunking': np.mean(chunking_times),
                    'ner': np.mean(ner_times)
                }
            else:
                average_runtimes[technique] = {'total': 0, 'chunking': 0, 'ner': 0}
        else:
            average_runtimes[technique] = {'total': 0, 'chunking': 0, 'ner': 0}
    
    return technique_results, technique_runtimes, average_accuracies, average_runtimes




In [None]:


def plot_runtime_comparison_log_scale(average_runtimes):
    """
    Creates a bar chart comparing the runtime of different chunking techniques
    using a logarithmic scale to better show small values.
    
    Args:
        average_runtimes: Dictionary mapping technique names to average runtime dictionaries
    """
    import matplotlib.pyplot as plt
    import numpy as np
    
    plt.figure(figsize=(12, 6))
    
    # Sort techniques by total runtime
    sorted_techniques = sorted(average_runtimes.items(), key=lambda x: x[1]['total'])
    techniques = [t for t, _ in sorted_techniques]
    
    # Extract the different time components
    chunking_times = [r['chunking'] for _, r in sorted_techniques]
    ner_times = [r['ner'] for _, r in sorted_techniques]
    
    # Set up the bar chart
    bar_width = 0.35
    index = np.arange(len(techniques))
    
    # Create stacked bars
    bars1 = plt.bar(index, chunking_times, bar_width, label='Chunking Time', color='skyblue')
    bars2 = plt.bar(index, ner_times, bar_width, bottom=chunking_times, label='NER Time', color='lightcoral')
    
    # Add value labels
    for i, (chunking, ner) in enumerate(zip(chunking_times, ner_times)):
        total = chunking + ner
        plt.text(i, total * 1.05, f'{total:.2f}s', ha='center', va='bottom', fontsize=8)
        # Add chunking time label inside or just above the chunking portion
        plt.text(i, chunking * 0.5, f'{chunking:.3f}s', ha='center', va='center', 
                color='black', fontsize=8, fontweight='bold')
    
    # Customize chart
    plt.title('Average Runtime by Chunking Technique (Log Scale)', fontsize=15)
    plt.xlabel('Chunking Technique', fontsize=12)
    plt.ylabel('Runtime (seconds)', fontsize=12)
    plt.xticks(index, techniques, rotation=45, ha='right')
    plt.legend()
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    # Set y-axis to logarithmic scale
    plt.yscale('log')
    
    # Add horizontal gridlines at specific values for better readability
    plt.grid(True, which="both", ls="-", alpha=0.2)
    
    plt.tight_layout()
    return plt

def plot_runtime_components_side_by_side(average_runtimes):
    """
    Creates a side-by-side bar chart comparing chunking and NER times separately.
    
    Args:
        average_runtimes: Dictionary mapping technique names to average runtime dictionaries
    """
    import matplotlib.pyplot as plt
    import numpy as np
    
    plt.figure(figsize=(14, 8))
    
    # Sort techniques by total runtime
    sorted_techniques = sorted(average_runtimes.items(), key=lambda x: x[1]['total'])
    techniques = [t for t, _ in sorted_techniques]
    
    # Extract the different time components
    chunking_times = [r['chunking'] for _, r in sorted_techniques]
    ner_times = [r['ner'] for _, r in sorted_techniques]
    
    # Set up the bar chart
    bar_width = 0.35
    index = np.arange(len(techniques))
    
    # Create side-by-side bars
    bar1 = plt.bar(index - bar_width/2, chunking_times, bar_width, label='Chunking Time', color='skyblue')
    bar2 = plt.bar(index + bar_width/2, ner_times, bar_width, label='NER Time', color='lightcoral')
    
    # Add value labels
    for i, time in enumerate(chunking_times):
        plt.text(i - bar_width/2, time, f'{time:.3f}s', ha='center', va='bottom', fontsize=9)
        
    for i, time in enumerate(ner_times):
        plt.text(i + bar_width/2, time, f'{time:.2f}s', ha='center', va='bottom', fontsize=9)
    
    # Customize chart
    plt.title('Runtime Components by Chunking Technique', fontsize=15)
    plt.xlabel('Chunking Technique', fontsize=12)
    plt.ylabel('Runtime (seconds)', fontsize=12)
    plt.xticks(index, techniques, rotation=45, ha='right')
    plt.legend()
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    plt.tight_layout()
    return plt

In [None]:
technique_results, technique_runtimes, average_accuracies, average_runtimes = compare_chunking_techniques_with_runtime(nlp, files_man)

log_plot = plot_runtime_comparison_log_scale(average_runtimes)
side_plot = plot_runtime_components_side_by_side(average_runtimes)
