In [None]:
import logging

logging.basicConfig(level=logging.INFO)

# Get the logger for docling and set its level
logging.getLogger('docling').setLevel(logging.INFO)
logging.getLogger('docling_core').setLevel(logging.INFO)
log = logging.getLogger(__name__)  # This makes your script a logging-aware application

In [None]:
def find_headers_in_html(doc, html_string, word):
    """Find headers in HTML that contain the given word, and include page info and parent H1 from doc."""
    try:
        from bs4 import BeautifulSoup
        from docling_core.types.doc.document import SectionHeaderItem
        soup = BeautifulSoup(html_string, 'html.parser')
        headers = []
        for tag in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
            text = tag.get_text().strip()
            if word.lower() in text.lower():
                # Find parent H1
                parent_h1 = None
                current = tag
                while current:
                    current = current.find_previous(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
                    if current and current.name == 'h1':
                        parent_h1 = current.get_text().strip()
                        break
                # Find corresponding SectionHeaderItem in doc
                page = None
                for item in doc.texts:
                    if isinstance(item, SectionHeaderItem) and item.text.strip() == text:
                        prov = getattr(item, "prov", None)
                        if prov:
                            for p in prov:
                                pg = getattr(p, "page_no", None)
                                if pg is not None:
                                    page = int(pg)
                                    break
                        break
                headers.append((tag.name, text, page, parent_h1))
        return headers
    except ImportError:
        print("BeautifulSoup not available. Install with: pip install beautifulsoup4")
        return []

# Call the function
html = export_doc_html(doc)
replacement_headers = find_headers_in_html(doc, html, 'replacement')
for level, text, page, parent_h1 in replacement_headers:
    print(f"{level}: {text} (page {page}) - Parent H1: {parent_h1}")

In [None]:
from pathlib import Path
from docling_core.types.doc.document import DoclingDocument

chunk = Path(r'data/temp_chunk_0-91_kona.json')
doc = DoclingDocument.load_from_json(chunk)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from docling_core.types.doc.document import SectionHeaderItem

def compute_header_similarity(doc, query, top_n=5):
    """
    Compute semantic similarity between all headers in a DoclingDocument and a query string using TF-IDF vectors.
Args:
    doc: DoclingDocument object containing the document.
    query: The query string (phrase or word).
    top_n: Number of top similar headers to return (default: 5).

Returns:
    List of dicts with 'header_text', 'similarity_score', 'header_item', and 'page' for the top-N headers.
    Each dict contains:
    - 'header_text': The text of the header.
    - 'similarity_score': Cosine similarity score (0-1).
    - 'header_item': The SectionHeaderItem object.
    - 'page': The page number where the header appears (if available).
"""
    try:
        # Extract all headers from the document
        headers = []
        for item in doc.texts:
            if isinstance(item, SectionHeaderItem):
                # Get page info
                page = None
                if hasattr(item, 'prov') and item.prov:
                    for p in item.prov:
                        pg = getattr(p, 'page_no', None)
                        if pg is not None:
                            page = int(pg)
                            break
                
                headers.append({
                    'text': item.text,
                    'item': item,
                    'page': page
                })
        
        if not headers:
            return []
        
        # Extract texts for vectorization
        texts = [h['text'] for h in headers]
        texts.append(query)
        
        # Vectorize using TF-IDF
        vectorizer = TfidfVectorizer()
        tfidf_matrix = vectorizer.fit_transform(texts)
        
        # Query vector is the last one
        query_vec = tfidf_matrix[-1]
        
        # Compute similarities
        similarities = []
        for i, header in enumerate(headers):
            header_vec = tfidf_matrix[i]
            
            # Cosine similarity
            cos_sim = cosine_similarity(query_vec, header_vec)[0][0]
            
            similarities.append({
                'header_text': header['text'],
                'similarity_score': cos_sim,
                'header_item': header['item'],
                'page': header['page']
            })
        
        # Sort by similarity score descending and return top-N
        similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
        return similarities[:top_n]

    except ImportError as e:
        print(f"Required libraries not available: {e}. Install scikit-learn and numpy.")
        return []
    except Exception as e:
        print(f"Error computing similarity: {e}")
        return []

In [None]:
query = "Where can I find the VIN?"
compute_header_similarity(doc, query)

In [None]:
import re
from typing import List, Any, Tuple, Dict, Set
from docling_core.types.doc.document import SectionHeaderItem, DoclingDocument
import spacy
import nltk
from nltk.corpus import wordnet as wn
from html import escape
from difflib import SequenceMatcher
from pathlib import Path
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

def rank_and_save_best_section_with_hdbscan(doc: DoclingDocument, query: str, top_n: int = 5, hdbscan_results: Dict[str, Any] = None) -> List[Dict[str, Any]]:
    """
    Rank headers in a DoclingDocument using query nouns/verbs + WordNet synonyms + HDBSCAN cluster similarity, and return top N matches.
    
    This function processes the document to extract headers, analyzes their linguistic features (nouns, verbs, synonyms),
    scores them against the query based on coverage, fuzzy matching, and cluster similarity, and returns the top N ranked headers with all data.
    
    Args:
        doc (DoclingDocument): The document object containing the text elements to process.
        query (str): The query string to match against header titles.
        top_n (int, optional): Number of top matches to return. Defaults to 5.
        hdbscan_results (Dict[str, Any], optional): Precomputed HDBSCAN results from perform_hdbscan_on_headers. If None, clustering is skipped.
    
    Returns:
        List[Dict[str, Any]]: List of dictionaries for the top N headers, each containing:
            - 'title': Header text
            - 'score': Matching score (0-1)
            - 'first_doc_page': First page number
            - 'doc_pages': List of all page numbers
            - 'level': Header level
            - 'nouns': Set of nouns in header
            - 'verbs': Set of verbs in header
            - 'syn_nouns': Set of synonym nouns
            - 'syn_verbs': Set of synonym verbs
            - 'noun_cov': Noun coverage score
            - 'verb_cov': Verb coverage score
            - 'noun_syn_cov': Synonym noun coverage
            - 'verb_syn_cov': Synonym verb coverage
            - 'fuzzy': Fuzzy match score
            - 'cluster_sim': Cluster similarity score (if hdbscan_results provided)
    
    Raises:
        Any exceptions from spaCy, NLTK, or file operations are not caught and will propagate.
    
    Example:
        >>> from pathlib import Path
        >>> from docling_core.types.doc.document import DoclingDocument
        >>> doc = DoclingDocument.load_from_json(Path("data/temp_chunk_0-91_kona.json"))
        >>> header_data = extract_header_texts(doc)
        >>> hdbscan_results = perform_hdbscan_on_headers(header_data)
        >>> results = rank_and_save_best_section_with_hdbscan(doc, "Where can I find the VIN?", top_n=3, hdbscan_results=hdbscan_results)
        >>> for r in results:
        ...     print(f"{r['title']} (page {r['first_doc_page']}) - Score: {r['score']:.3f}")
    """
    # Load spaCy model
    nlp = spacy.load("en_core_web_lg")
    
    # Download WordNet if needed
    # nltk.download("wordnet")
    # nltk.download("omw-1.4")
    
    def is_body(x: Any) -> bool:
        """Check if the text element belongs to the body content layer."""
        v = getattr(x, "content_layer", None)
        return getattr(v, "value", v) == "body"
    
    texts: List[Any] = list(doc.texts)
    headers: List[Tuple[int, SectionHeaderItem]] = [
        (i, t) for i, t in enumerate(texts) if isinstance(t, SectionHeaderItem) and is_body(t)
    ]
    
    def item_pages(obj: Any) -> Set[int]:
        """Extract page numbers from a document object using provenance or fallback."""
        pages: Set[int] = set()
        prov = getattr(obj, "prov", None)
        if prov:
            for p in prov:
                pg = getattr(p, "page_no", None)
                if pg is not None:
                    try:
                        pages.add(int(pg))
                    except Exception:
                        pass
        pr = getattr(obj, "page_ref", None)
        if pr is not None and not pages:
            try:
                pages.add(int(pr) + 1)
            except Exception:
                pages.add(1)
        return pages
    
    def nodes_pages(nodes: List[Any]) -> Set[int]:
        """Collect all unique page numbers from a list of nodes."""
        ps: Set[int] = set()
        for n in nodes:
            ps |= item_pages(n)
        return ps
    
    def slice_nodes(i: int) -> Tuple[SectionHeaderItem, List[Any]]:
        """Slice the document to get the section content under a header."""
        h = texts[i]
        lvl = getattr(h, "level", 3)
        nodes = []
        for j in range(i + 1, len(texts)):
            t = texts[j]
            if not is_body(t):
                continue
            if isinstance(t, SectionHeaderItem) and getattr(t, "level", 3) <= lvl:
                break
            nodes.append(t)
        return h, nodes
    
    def has_content(nodes: List[Any]) -> bool:
        """Check if the section nodes contain meaningful content."""
        textish = 0
        structural = 0
        for n in nodes:
            name = n.__class__.__name__.lower()
            if hasattr(n, "text") and name != "sectionheaderitem":
                if re.search(r"\w", getattr(n, "text", "") or ""):
                    textish += 1
            if hasattr(n, "items") or hasattr(n, "num_rows") or hasattr(n, "caption"):
                structural += 1
        return textish >= 1 or structural >= 1
    
    # Build header index
    IndexItem = Dict[str, Any]
    index: List[IndexItem] = []
    
    for i, h in headers:
        title = getattr(h, "text", "") or ""
        header_ps = item_pages(h)
        nodes = slice_nodes(i)[1]
        if not has_content(nodes):
            continue
        section_ps = nodes_pages(nodes)
        
        nouns: Set[str] = set()
        verbs: Set[str] = set()
        
        doc_h = nlp(title)
        for tok in doc_h:
            if tok.is_stop or not tok.is_alpha:
                continue
            lemma = tok.lemma_.lower()
            if tok.pos_ in ("NOUN", "PROPN"):
                nouns.add(lemma)
            elif tok.pos_ in ("VERB",):
                verbs.add(lemma)
        
        syns_n: Set[str] = set()
        syns_v: Set[str] = set()
        for n in nouns:
            for s in wn.synsets(n, pos=wn.NOUN):
                if hasattr(s, 'lemma_names'):
                    for l in s.lemma_names():
                        syns_n.add(l.replace("_", " ").lower())
        for v in verbs:
            for s in wn.synsets(v, pos=wn.VERB):
                if hasattr(s, 'lemma_names'):
                    for l in s.lemma_names():
                        syns_v.add(l.replace("_", " ").lower())
        
        index.append({
            "i": i,
            "header_pages": sorted(header_ps),
            "section_pages": sorted(section_ps),
            "doc_pages": sorted((header_ps | section_ps)),
            "level": getattr(h, "level", 3),
            "title": title,
            "nouns": nouns,
            "verbs": verbs,
            "syn_nouns": syns_n,
            "syn_verbs": syns_v,
        })
    
    # Extract query nouns/verbs
    q_nouns, q_verbs = set(), set()
    q_text = query
    qdoc = nlp(q_text)
    for tok in qdoc:
        if tok.is_stop or not tok.is_alpha:
            continue
        lemma = tok.lemma_.lower()
        if tok.pos_ in ("NOUN", "PROPN"):
            q_nouns.add(lemma)
        elif tok.pos_ in ("VERB",):
            q_verbs.add(lemma)
    
    # Expand query with WordNet
    q_syn_n, q_syn_v = set(), set()
    for n in q_nouns:
        for s in wn.synsets(n, pos=wn.NOUN):
            if hasattr(s, 'lemma_names'):
                for l in s.lemma_names():
                    q_syn_n.add(l.replace("_", " ").lower())
    for v in q_verbs:
        for s in wn.synsets(v, pos=wn.VERB):
            if hasattr(s, 'lemma_names'):
                for l in s.lemma_names():
                    q_syn_v.add(l.replace("_", " ").lower())
    
    # Prepare cluster similarity if HDBSCAN results provided
    cluster_sim_scores = {}
    if hdbscan_results:
        labels = hdbscan_results.get("labels", [])
        cluster_names = hdbscan_results.get("cluster_names", {})
        
        # Vectorize cluster names and query for similarity
        cluster_texts = list(cluster_names.values())
        cluster_texts.append(q_text)
        if cluster_texts:
            vectorizer = TfidfVectorizer()
            cluster_matrix = vectorizer.fit_transform(cluster_texts)
            query_vec = cluster_matrix[-1]
            
            for idx, cluster_name in enumerate(cluster_names.values()):
                cluster_vec = cluster_matrix[idx]
                sim = cosine_similarity(query_vec, cluster_vec)[0][0]
                cluster_sim_scores[cluster_name] = sim
    
    # Score headers
    cands = []
    for h_idx, h in enumerate(index):
        hn = h['nouns']; hv = h['verbs']
        syn_n = h['syn_nouns']; syn_v = h['syn_verbs']
        
        noun_cov = len(q_nouns & hn) / max(1, len(q_nouns))
        verb_cov = len(q_verbs & hv) / max(1, len(q_verbs))
        
        noun_syn_cov = len(q_syn_n & (hn | syn_n)) / max(1, len(q_syn_n)) if q_syn_n else 0.0
        verb_syn_cov = len(q_syn_v & (hv | syn_v)) / max(1, len(q_syn_v)) if q_syn_v else 0.0
        
        fuzzy = SequenceMatcher(None, q_text.lower(), h['title'].lower()).ratio()
        
        # Cluster similarity
        cluster_sim = 0.0
        if hdbscan_results and h_idx < len(labels):
            cluster_label = labels[h_idx]
            if cluster_label != -1:
                cluster_name = cluster_names.get(f"cluster_{cluster_label}", "")
                cluster_sim = cluster_sim_scores.get(cluster_name, 0.0)
        
        score = (
            0.35 * noun_cov +
            0.25 * verb_cov +
            0.15 * noun_syn_cov +
            0.10 * verb_syn_cov +
            0.05 * fuzzy +
            0.10 * cluster_sim  # Add cluster similarity
        )
        
        doc_pages = h.get('doc_pages', [])
        first_doc_page = doc_pages[0] if doc_pages else (h.get('header_pages') or h.get('section_pages') or [1])[0]
        
        cands.append({**h, 'score': score, 'fuzzy': fuzzy,
                      'noun_cov': noun_cov, 'verb_cov': verb_cov,
                      'noun_syn_cov': noun_syn_cov, 'verb_syn_cov': verb_syn_cov,
                      'cluster_sim': cluster_sim, 'first_doc_page': first_doc_page})
    
    cands.sort(key=lambda x: x['score'], reverse=True)
    
    # Print top results
    print("Top headers:")
    for r, c in enumerate(cands[:top_n], start=1):
        pages_str = ','.join(str(p) for p in c.get('doc_pages', []) or c.get('header_pages', []) or c.get('section_pages', []) or ["?"])
        print(f"{r:>2}. p{c['first_doc_page']:>4} h{c['level']} score={c['score']:.3f} | "
              f"noun={c['noun_cov']:.2f} verb={c['verb_cov']:.2f} n_syn={c['noun_syn_cov']:.2f} "
              f"v_syn={c['verb_syn_cov']:.2f} fuzz={c['fuzzy']:.2f} clust={c['cluster_sim']:.2f} :: {c['title']}  [pages: {pages_str}]")
    
    return cands[:top_n]

In [None]:
from pathlib import Path
from docling_core.types.doc.document import DoclingDocument

chunk = Path(r'data/temp_chunk_0-91_kona.json')
doc = DoclingDocument.load_from_json(chunk)

query = "Where can I find the VIN number?"
results = rank_and_save_best_section_with_hdbscan(doc, query, top_n=5)
print("\nReturned data for top matches:")
for r in results:
    print(f"Title: {r['title']}, Page: {r['first_doc_page']}, Score: {r['score']:.3f}")

In [None]:
import spacy
from spacy.cli.download import download
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from docling_core.types.doc.document import SectionHeaderItem


nlp = spacy.load("en_core_web_lg")


def compute_semantic_header_similarity(doc, query, top_n=5):
    """
    Compute semantic similarity between all headers in a DoclingDocument and a query string using spaCy word vectors.
    Args:
        doc: DoclingDocument object containing the document.
        query: The query string.
        top_n: Number of top similar headers to return (default: 5).
    Returns:
        List of dicts with 'header_text', 'similarity_score', 'header_item', and 'page' for the top-N headers.
    """
    try:
        
        # Extract all headers
        headers = []
        for item in doc.texts:
            if isinstance(item, SectionHeaderItem):
                page = None
                if hasattr(item, 'prov') and item.prov:
                    for p in item.prov:
                        pg = getattr(p, 'page_no', None)
                        if pg is not None:
                            page = int(pg)
                            break
                headers.append({
                    'text': item.text,
                    'item': item,
                    'page': page
                })
        
        if not headers:
            return []
        
        # Function to get vector for a text
        def get_vector(text):
            doc_nlp = nlp(text)
            vectors = [token.vector for token in doc_nlp if token.has_vector and not token.is_stop]
            if vectors:
                return np.mean(vectors, axis=0)
            else:
                return np.zeros(300)  # en_core_web_lg has 300 dimensions
        
        # Get vectors for query and headers
        query_vec = get_vector(query)
        header_vecs = [get_vector(h['text']) for h in headers]
        
        # Compute cosine similarities
        similarities = []
        for i, header in enumerate(headers):
            header_vec = header_vecs[i]
            # Cosine similarity
            cos_sim = cosine_similarity([query_vec], [header_vec])[0][0]
            similarities.append({
                'header_text': header['text'],
                'similarity_score': cos_sim,
                'header_item': header['item'],
                'page': header['page']
            })
        
        # Sort by similarity score descending
        similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
        return similarities[:top_n]
    
    except Exception as e:
        print(f"Error computing semantic similarity: {e}")
        return []

# Test the semantic similarity
query = "Where can I find the VIN?"
compute_semantic_header_similarity(doc, query)

In [None]:
# Test the semantic similarity
query = "Where can I find the VIN?"
rank_and_save_best_section(doc, query)

In [None]:
import spacy
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from docling_core.types.doc.document import SectionHeaderItem

# Load spaCy transformer model for better semantic understanding
nlp = spacy.load("en_core_web_lg")

def match_headers_to_query(doc: DoclingDocument, query: str, top_n: int = 5):
    """
    Match headers in a DoclingDocument to a query using spaCy's transformer model for semantic similarity.
    
    Args:
        doc (DoclingDocument): The document to search.
        query (str): The query string.
        top_n (int): Number of top matching headers to return.
    
    Returns:
        List[dict]: List of dictionaries with header details and similarity scores.
    """
    # Extract all headers
    headers = []
    for item in doc.texts:
        if isinstance(item, SectionHeaderItem):
            page = None
            if hasattr(item, 'prov') and item.prov:
                for p in item.prov:
                    pg = getattr(p, 'page_no', None)
                    if pg is not None:
                        page = int(pg)
                        break
            headers.append({
                'text': item.text,
                'item': item,
                'page': page
            })
    
    if not headers:
        return []
    
    # Get query doc
    query_doc = nlp(query)
    
    # Get header docs
    header_docs = [nlp(h['text']) for h in headers]
    
    # Compute similarities using spaCy's similarity
    similarities = []
    for i, header in enumerate(headers):
        header_doc = header_docs[i]
        sim_score = query_doc.similarity(header_doc)
        similarities.append({
            'header_text': header['text'],
            'similarity_score': sim_score,
            'header_item': header['item'],
            'page': header['page']
        })
    
    # Sort by similarity
    similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
    return similarities[:top_n]

# Example usage
query = "Where can I find the VIN?"
results = match_headers_to_query(doc, query)
for result in results:
    print(f"Score: {result['similarity_score']:.3f} | {result['header_text']} (page {result['page']})")

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np


def compute_semantic_similarity(headers_list, query):
    """
    Compute semantic similarity between a list of headers and a query string using TF-IDF vectors.
    
    Args:
        headers_list: List of tuples (level, text, page, parent_h1) from find_headers_in_html or similar.
        query: The query string (phrase or word).
    
    Returns:
        List of dicts with 'header', 'cosine_similarity', and 'euclidean_distance'.
    """
    try:

        
        # Extract texts from headers
        texts = [text for _, text, _, _ in headers_list]
        texts.append(query)
        
        # Vectorize using TF-IDF
        vectorizer = TfidfVectorizer()
        tfidf_matrix = vectorizer.fit_transform(texts)
        
        # Query vector is the last one
        query_vec = tfidf_matrix[-1]
        
        similarities = []
        for i, header in enumerate(headers_list):
            header_vec = tfidf_matrix[i]
            
            # Cosine similarity
            cos_sim = cosine_similarity(query_vec, header_vec)[0][0]
            
            # Euclidean distance
            euclidean = np.linalg.norm(query_vec.toarray() - header_vec.toarray())
            
            similarities.append({
                'header': header,
                'cosine_similarity': cos_sim,
                'euclidean_distance': euclidean
            })
        
        return similarities
    except ImportError as e:
        print(f"Required libraries not available: {e}. Install scikit-learn and numpy.")
        return []

In [None]:
html = export_doc_html(doc)
replacement_headers = find_headers_in_html(doc, html, 'cabin air filter')
compute_semantic_similarity(replacement_headers, 'replace cabin air filter')

In [None]:
from docling_core.types.doc.document import SectionHeaderItem

def find_headers_with_word(doc, word):
    """Find all SectionHeaderItem that contain the given word in their text."""
    matches = []
    for text in doc.texts:
        if isinstance(text, SectionHeaderItem):
            if word.lower() in text.text.lower():
                matches.append(text)
    return matches

# Example usage
replacement_headers = find_headers_with_word(doc, 'replacement')
for header in replacement_headers:
    print(header)

In [None]:
# Parameters for header-only selection
query = "Where can I find the VIN?"  # set this per user request
print("Query:", query)

In [None]:
chunk = Path(r'data/temp_chunk_0-91_kona.json')
doc = DoclingDocument.load_from_json(chunk)

In [97]:
from docling_core.types.doc.document import SectionHeaderItem
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
import hdbscan
import numpy as np
from scipy import sparse
from typing import List, Dict, Any

def extract_header_texts(doc) -> List[Dict[str, Any]]:
    """Extract all section header texts from a DoclingDocument with indices and pages."""
    headers = []
    for i, item in enumerate(doc.texts):
        if isinstance(item, SectionHeaderItem):
            page = None
            if hasattr(item, 'prov') and item.prov:
                for p in item.prov:
                    pg = getattr(p, 'page_no', None)
                    if pg is not None:
                        page = int(pg)
                        break
            headers.append({
                'text': item.text.strip(),
                'index': i,
                'page': page
            })
    return headers

In [98]:
def perform_lda_on_headers(header_data: List[Dict[str, Any]], n_topics: int = 10, max_iter: int = 10, random_state: int = 42) -> Dict[str, Any]:
    """Perform LDA topic modeling on header texts.
    
    Args:
        header_data: List of dicts with 'text', 'index', 'page'.
        n_topics: Number of topics to extract.
        max_iter: Maximum iterations for LDA.
        random_state: Random state for reproducibility.
    
    Returns:
        Dict with 'topics' (list of dicts with topic_id, top_words, top_headers), 'topic_distributions', and 'vectorizer'.
    """
    if not header_data:
        return {"topics": [], "topic_distributions": np.array([]), "vectorizer": None}
    
    header_texts = [h['text'] for h in header_data]
    
    # Vectorize
    vectorizer = TfidfVectorizer(stop_words='english', max_df=0.95, min_df=2)
    tfidf_matrix = vectorizer.fit_transform(header_texts)
    
    # Fit LDA
    lda = LatentDirichletAllocation(n_components=n_topics, max_iter=max_iter, random_state=random_state)
    topic_distributions = lda.fit_transform(tfidf_matrix)
    
    # Get top words per topic
    feature_names = vectorizer.get_feature_names_out()
    topics = []
    for topic_idx, topic in enumerate(lda.components_):
        top_words = [feature_names[i] for i in topic.argsort()[:-11:-1]]  # Top 10 words
        
        # Get top 5 headers for this topic
        topic_probs = topic_distributions[:, topic_idx]
        top_header_indices = topic_probs.argsort()[-5:][::-1]
        top_headers = []
        for idx in top_header_indices:
            header = header_data[idx]
            top_headers.append({
                'text': header['text'],
                'page': header['page'],
                'probability': topic_probs[idx]
            })
        
        topics.append({
            "topic_id": topic_idx, 
            "top_words": top_words,
            "top_headers": top_headers
        })
    
    return {
        "topics": topics,
        "topic_distributions": topic_distributions,
        "vectorizer": vectorizer,
        "lda_model": lda
    }



In [107]:
def perform_hdbscan_on_headers(header_data: List[Dict[str, Any]], min_cluster_size: int = 5, min_samples: int = 1) -> Dict[str, Any]:
    """Perform HDBSCAN clustering on header texts using TF-IDF vectors with filtering of generic terms.

    Args:
        header_data: List of dicts with 'text', 'index', 'page'.
        min_cluster_size: Minimum size of clusters.
        min_samples: Minimum samples in neighborhood.

    Returns:
        Dict with 'labels', 'probabilities', 'cluster_info', 'cluster_names', 'cluster_headers', and 'vectorizer'.
    """
    if not header_data:
        return {"labels": [], "probabilities": [], "cluster_info": {}, "cluster_names": {}, "cluster_headers": {}, "vectorizer": None}

    # Filter out generic/irrelevant headers
    generic_terms = {
        'information', 'caution', 'warning', 'note', 'notice', 'important',
        'the', 'illustration',
        'shape', 'differ', 'actual', 'may', 'from', 'the', 'and', 'or', 'but',
        'if', 'when', 'where', 'how', 'what', 'why', 'which', 'who', 'that',
        'this', 'these', 'those', 'here', 'there', 'then', 'now', 'always',
        'never', 'sometimes', 'often', 'usually', 'generally', 'specifically',
        'particularly', 'especially', 'mainly', 'primarily', 'basically', 'your'
    }

    filtered_header_data = []
    for header in header_data:
        text_lower = header['text'].lower().strip()

        # Skip headers that are just generic terms
        if text_lower in generic_terms:
            continue

        # Skip headers that contain mostly generic terms
        words = text_lower.split()
        if len(words) <= 2:  # Very short headers
            if any(word in generic_terms for word in words):
                continue

        # Skip headers that are too generic (contain only stop words or generic terms)
        import re
        meaningful_words = [re.sub(r'[^\w\s]', '', word) for word in words if re.sub(r'[^\w\s]', '', word) not in generic_terms and len(re.sub(r'[^\w\s]', '', word)) > 2]
        if len(meaningful_words) < 1:
            continue

        filtered_header_data.append(header)

    print(f"Filtered {len(header_data) - len(filtered_header_data)} generic headers. Remaining: {len(filtered_header_data)}")

    if not filtered_header_data:
        return {"labels": [], "probabilities": [], "cluster_info": {}, "cluster_names": {}, "cluster_headers": {}, "vectorizer": None}

    header_texts = [h['text'] for h in filtered_header_data]

    # Enhanced stop words for clustering
    custom_stop_words = [
        'information', 'caution', 'warning', 'note', 'notice', 'important',
        'the', 'and', 'or', 'but', 'if', 'when', 'where', 'how', 'what', 'why',
        'which', 'who', 'that', 'this', 'these', 'those', 'here', 'there',
        'then', 'now', 'always', 'never', 'sometimes', 'often', 'usually',
        'generally', 'specifically', 'particularly', 'especially', 'mainly',
        'primarily', 'basically', 'may', 'can', 'will', 'should', 'would',
        'could', 'might', 'must', 'shall', 'do', 'does', 'did', 'doing',
        'done', 'have', 'has', 'had', 'having', 'be', 'is', 'am', 'are',
        'was', 'were', 'being', 'been', 'to', 'of', 'in', 'on', 'at', 'by',
        'for', 'with', 'as', 'from', 'into', 'through', 'during', 'before',
        'after', 'above', 'below', 'between', 'among', 'within', 'without',
        'against', 'along', 'around', 'behind', 'beside', 'besides', 'beyond',
        'inside', 'outside', 'under', 'over', 'across', 'throughout', 'towards',
        'shape', 'differ', 'illustration', 'actual', 'your', 'warmers'
    ]

    # Vectorize with enhanced stop words
    vectorizer = TfidfVectorizer(
        stop_words='english',
        max_df=0.95,
        min_df=2,
        ngram_range=(1, 2),  # Include bigrams for better context
        token_pattern=r'(?u)\b[a-zA-Z]{3,}\b'  # Only words with 3+ characters
    )

    # Add custom stop words
    if vectorizer.stop_words:
        combined_stop_words = list(vectorizer.stop_words) + custom_stop_words
    else:
        combined_stop_words = custom_stop_words

    vectorizer.set_params(stop_words=combined_stop_words)

    tfidf_matrix = vectorizer.fit_transform(header_texts)

    # Convert to dense array for HDBSCAN
    if sparse.issparse(tfidf_matrix):
        tfidf_matrix = tfidf_matrix.toarray()

    # Fit HDBSCAN
    clusterer = hdbscan.HDBSCAN(min_cluster_size=min_cluster_size, min_samples=min_samples)
    labels = clusterer.fit_predict(tfidf_matrix)
    probabilities = clusterer.probabilities_

    # Cluster info
    unique_labels = set(labels)
    cluster_info = {}
    for label in unique_labels:
        if label == -1:
            cluster_info["noise"] = sum(labels == label)
        else:
            cluster_info[f"cluster_{label}"] = sum(labels == label)

    # Name clusters by top words (excluding generic terms)
    feature_names = vectorizer.get_feature_names_out()

    # Filter feature names to remove generic terms
    import re
    meaningful_features = [f for f in feature_names if re.sub(r'[^\w\s]', '', f.lower()) not in generic_terms and len(re.sub(r'[^\w\s]', '', f)) > 2]

    cluster_names = {}
    cluster_headers = {}
    for label in unique_labels:
        if label == -1:
            cluster_names["noise"] = "Noise/Unclustered"
            cluster_headers["noise"] = []
        else:
            # Get indices of headers in this cluster
            cluster_indices = [i for i, l in enumerate(labels) if l == label]
            cluster_headers_list = []
            if cluster_indices:
                # Average TF-IDF vectors for the cluster
                cluster_vectors = tfidf_matrix[cluster_indices]
                avg_vector = np.mean(cluster_vectors, axis=0)

                # Get top meaningful words
                # Map back to original feature indices
                feature_indices = [i for i, f in enumerate(feature_names) if f in meaningful_features]
                if feature_indices:
                    meaningful_scores = avg_vector[feature_indices]
                    top_indices = meaningful_scores.argsort()[-5:][::-1]
                    top_words = [meaningful_features[i] for i in top_indices]
                    cluster_names[f"cluster_{label}"] = ", ".join(top_words)
                else:
                    cluster_names[f"cluster_{label}"] = "Generic Cluster"

                # Get headers in cluster
                for idx in cluster_indices:
                    header = filtered_header_data[idx]
                    cluster_headers_list.append({
                        'text': header['text'],
                        'page': header['page'],
                        'probability': probabilities[idx] if idx < len(probabilities) else None
                    })
            else:
                cluster_names[f"cluster_{label}"] = "Empty Cluster"
                cluster_headers_list = []
            cluster_headers[f"cluster_{label}"] = cluster_headers_list

    return {
        "labels": labels,
        "probabilities": probabilities,
        "cluster_info": cluster_info,
        "cluster_names": cluster_names,
        "cluster_headers": cluster_headers,
        "vectorizer": vectorizer,
        "clusterer": clusterer,
        "filtered_headers": filtered_header_data
    }

In [108]:
# Suppress hdbscan warnings
import warnings
warnings.filterwarnings("ignore", category=SyntaxWarning, module="hdbscan")

# Experiment with LDA on headers
header_data = extract_header_texts(doc)
print(f"Extracted {len(header_data)} headers.")

# LDA Experiment
lda_results = perform_lda_on_headers(header_data, n_topics=30, max_iter=20)
print("\nLDA Topics:")
for topic in lda_results["topics"]:
    print(f"Topic {topic['topic_id']}: {', '.join(topic['top_words'])}")
    print("  Top headers:")
    for header in topic['top_headers']:
        print(f"    - '{header['text']}' (page {header['page']}, prob: {header['probability']:.3f})")
    print()

# HDBSCAN Experiment
hdbscan_results = perform_hdbscan_on_headers(header_data, min_cluster_size=3, min_samples=1)
print("\nHDBSCAN Clusters:")
print(f"Cluster info: {hdbscan_results['cluster_info']}")
print(f"Labels: {hdbscan_results['labels'][:10]}...")  # First 10 labels
print("\nCluster Names and Headers:")
for cluster, name in hdbscan_results["cluster_names"].items():
    count = hdbscan_results["cluster_info"].get(cluster, 0)
    print(f"{cluster}: {name} ({count} headers)")
    if cluster in hdbscan_results["cluster_headers"]:
        for header in hdbscan_results["cluster_headers"][cluster][:5]:  # Show first 5 headers
            print(f"  - '{header['text']}' (page {header['page']})")
    print()



Extracted 261 headers.

LDA Topics:
Topic 0: consumer, safety, vehicle, information, reporting, defects, ventilation, support, srs, use
  Top headers:
    - 'Consumer Information' (page 40, prob: 0.353)
    - '2.  Vehicle Information, Consumer Information and Reporting Safety Defects' (page 19, prob: 0.129)
    - 'TWO-WAY RADIO OR CELLULAR' (page 2, prob: 0.033)
    - 'Do not lie down' (page 66, prob: 0.033)
    - 'Tires And Wheels' (page 32, prob: 0.033)

  Top headers:
    - '2. Why HYUNDAI Genuine Parts?' (page 10, prob: 0.646)
    - 'Guide To HYUNDAI Genuine Parts' (page 9, prob: 0.646)
    - '3. How can you tell if you are purchasing HYUNDAI Genuine Parts?' (page 10, prob: 0.646)

Topic 2: label, number, adjusting, height, locking, recommended, vehicle, engine, air, ventilation
  Top headers:
    - 'Recommended SAE viscosity number' (page 36, prob: 0.600)
    - 'Adjusting the height up and down' (page 53, prob: 0.599)
    - 'Adjusting the height up and down' (page 55, prob: 0.599)



In [109]:
# Test improved ranking with HDBSCAN
query = "Where can I find the VIN number?"
print(f"\nTesting improved ranking for query: '{query}'")
improved_results = rank_and_save_best_section_with_hdbscan(doc, query, top_n=5, hdbscan_results=hdbscan_results)
print("\nImproved results with HDBSCAN:")
for r in improved_results:
    print(f"Title: {r['title']}, Page: {r['first_doc_page']}, Score: {r['score']:.3f}, Cluster Sim: {r['cluster_sim']:.3f}")


Testing improved ranking for query: 'Where can I find the VIN number?'
Top headers:
 1. p  37 h1 score=0.521 | noun=1.00 verb=0.00 n_syn=1.00 v_syn=0.00 fuzz=0.42 clust=0.00 :: Vehicle Identification Number (VIN)  [pages: 37]
 2. p  38 h1 score=0.352 | noun=0.50 verb=0.00 n_syn=1.00 v_syn=0.00 fuzz=0.53 clust=0.00 :: Engine Number  [pages: 38]
 3. p  33 h1 score=0.040 | noun=0.00 verb=0.00 n_syn=0.00 v_syn=0.00 fuzz=0.40 clust=0.20 :: Air Conditioning System  [pages: 33]
 4. p  20 h1 score=0.036 | noun=0.00 verb=0.00 n_syn=0.00 v_syn=0.00 fuzz=0.32 clust=0.20 :: Exterior Overview (Front View)  [pages: 20]
 5. p  17 h1 score=0.030 | noun=0.00 verb=0.00 n_syn=0.09 v_syn=0.00 fuzz=0.33 clust=0.00 :: Vehicle Data Collection And Event Data Recorders  [pages: 17]

Improved results with HDBSCAN:
Title: Vehicle Identification Number (VIN), Page: 37, Score: 0.521, Cluster Sim: 0.000
Title: Engine Number, Page: 38, Score: 0.352, Cluster Sim: 0.000
Title: Air Conditioning System, Page: 33, Score

In [82]:
def find_cluster_for_query(query: str, hdbscan_results: Dict[str, Any], header_data: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Find which cluster a query belongs to using semantic similarity with enhanced query processing.

    Args:
        query: The query string to classify.
        hdbscan_results: Results from perform_hdbscan_on_headers containing the trained model.
        header_data: The original header data used for clustering.

    Returns:
        Dict with 'predicted_cluster', 'cluster_name', 'cluster_headers', 'similarity_score', and 'all_cluster_similarities'.
    """
    if not hdbscan_results or not hdbscan_results.get("vectorizer"):
        return {"error": "No HDBSCAN results or vectorizer found"}

    # Enhanced query processing
    import spacy
    import nltk
    from nltk.corpus import wordnet as wn

    nlp = spacy.load("en_core_web_lg")

    # Extract key terms from query
    q_nouns = set()
    qdoc = nlp(query)
    for tok in qdoc:
        if tok.is_stop or not tok.is_alpha:
            continue
        lemma = tok.lemma_.lower()
        if tok.pos_ in ("NOUN", "PROPN"):
            q_nouns.add(lemma)

    # Expand query with synonyms
    expanded_terms = set()
    for noun in q_nouns:
        expanded_terms.add(noun)
        for synset in wn.synsets(noun, pos=wn.NOUN):
            for lemma in synset.lemma_names():
                expanded_terms.add(lemma.replace("_", " ").lower())

    # Create expanded query
    expanded_query = query + " " + " ".join(expanded_terms)
    print(f"Original query: '{query}'")
    print(f"Expanded query: '{expanded_query}'")

    # Use semantic similarity instead of TF-IDF
    cluster_names = hdbscan_results["cluster_names"]
    cluster_headers = hdbscan_results["cluster_headers"]

    # Get vector for expanded query
    query_doc = nlp(expanded_query)

    # Compute semantic similarities to cluster names
    similarities = {}
    for cluster_key, cluster_name in cluster_names.items():
        if cluster_key != "noise":
            cluster_doc = nlp(cluster_name)
            similarity = query_doc.similarity(cluster_doc)
            similarities[int(cluster_key.split("_")[1])] = similarity

    # Find the most similar cluster
    if similarities:
        best_cluster = max(similarities, key=similarities.get)
        best_similarity = similarities[best_cluster]
    else:
        best_cluster = -1
        best_similarity = 0.0

    # Get cluster info
    cluster_key = f"cluster_{best_cluster}" if best_cluster != -1 else "noise"
    cluster_name = cluster_names.get(cluster_key, "Unknown")
    headers_in_cluster = cluster_headers.get(cluster_key, [])

    # Sort similarities
    sorted_similarities = {cluster_names.get(f"cluster_{k}", f"cluster_{k}"): v for k, v in similarities.items()}
    sorted_similarities = dict(sorted(sorted_similarities.items(), key=lambda x: x[1], reverse=True))

    # Print formatted output
    print("== Testing Cluster Prediction ===")
    print(f"\nQuery: '{query}'")
    print(f"Predicted Cluster: {cluster_key} - {cluster_name}")
    print(f"Similarity Score: {best_similarity:.3f}")
    print(f"Headers in cluster: {len(headers_in_cluster)}")
    print("Top headers in cluster:")
    for header in headers_in_cluster[:5]:  # Show top 5 headers
        page = header.get('page', 'N/A')
        print(f"  - '{header['text']}' (page {page})")
    print("Similarities to other clusters:")
    for cluster, sim in list(sorted_similarities.items())[:5]:  # Show top 5 similarities
        print(f"  - {cluster}: {sim:.3f}")

    return {
        "predicted_cluster": cluster_key,
        "cluster_name": cluster_name,
        "cluster_headers": headers_in_cluster,
        "similarity_score": best_similarity,
        "all_cluster_similarities": sorted_similarities,
        "query": query,
        "expanded_query": expanded_query
    }

In [110]:
query = "where can I find the VIN number"
result = find_cluster_for_query(query, hdbscan_results, header_data)
print(f"Query belongs to cluster: {result['cluster_name']} (similarity: {result['similarity_score']:.3f})")

Original query: 'where can I find the VIN number'
Expanded query: 'where can I find the VIN number figure numeral turn vin act number bit telephone number issue phone number identification number routine'
== Testing Cluster Prediction ===

Query: 'where can I find the VIN number'
Predicted Cluster: cluster_23 - engine, number, vehicle, view, ventilation seats
Similarity Score: 0.636
Headers in cluster: 3
Top headers in cluster:
  - 'Engine' (page 30)
  - 'Vehicle Identification Number (VIN)' (page 37)
  - 'Engine Number' (page 38)
Similarities to other clusters:
  - engine, number, vehicle, view, ventilation seats: 0.636
  - system, securing child, securing, child restraint, child: 0.588
  - label, air, vehicle, view, ventilation seats: 0.579
  - belt use, use, seat belt, belt, seat: 0.558
  - overview, control, center, ventilation seats, view: 0.556
Query belongs to cluster: engine, number, vehicle, view, ventilation seats (similarity: 0.636)
