In [2]:
import os
import re
from collections import Counter
from sklearn.base import BaseEstimator, TransformerMixin
from sentence_transformers import SentenceTransformer
import numpy as np
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from qdrant_client.models import PointStruct
import fitz


  from .autonotebook import tqdm as notebook_tqdm


In [4]:
class PDFTextCleaner(BaseEstimator, TransformerMixin):
    def __init__(self, remove_numbers=False, remove_special=True, lower=True,
                 remove_graphics=True, remove_headers_footers=True):
        self.remove_numbers = remove_numbers
        self.remove_special = remove_special
        self.lower = lower
        self.remove_graphics = remove_graphics
        self.remove_headers_footers = remove_headers_footers

    def _extract_pages_text(self, pdf_path):
        pages = []
        with fitz.open(pdf_path) as doc:
            for page in doc:
                text = page.get_text()
                if text:
                    pages.append(text)
        return pages

    def _most_common_nonempty(self, lines):
        nonempty_lines = [line for line in lines if line]
        if not nonempty_lines:
            return ''
        counter = Counter(nonempty_lines)
        most_common_line, count = counter.most_common(1)[0]
        return most_common_line if count > 1 else ''

    def _detect_and_remove_headers_footers(self, pages_text):
        first_lines = [page_text.split('\n')[0].strip() if page_text else '' for page_text in pages_text]
        last_lines = [page_text.split('\n')[-1].strip() if page_text else '' for page_text in pages_text]
        header = self._most_common_nonempty(first_lines)
        footer = self._most_common_nonempty(last_lines)
        cleaned_pages = []
        for text in pages_text:
            lines = text.split('\n')
            if lines and lines[0].strip() == header:
                lines = lines[1:]
            if lines and lines[-1].strip() == footer:
                lines = lines[:-1]
            cleaned_pages.append('\n'.join(lines))
        return cleaned_pages

    def _remove_graphics_text(self, text):
        text = re.sub(r'\b(Figure|Fig|Table|Chart|Graph|Diagram|Plot|Image|Illustration)\s*\d+', '', text, flags=re.I)
        lines = text.split('\n')
        filtered_lines = []
        for line in lines:
            if re.search(r'[\|\-\+═]+', line):
                continue
            if len(line.split()) > 3 and re.search(r'\s{3,}', line):
                continue
            filtered_lines.append(line)
        return '\n'.join(filtered_lines)

    def _clean_text(self, text):
        if self.lower:
            text = text.lower()
        text = re.sub(r'\s+', ' ', text)  # Normalize whitespace
        if self.remove_numbers:
            text = re.sub(r'\d+', '', text)
        if self.remove_special:
            text = re.sub(r'[^\w\s]', '', text)
        return text.strip()

    def fit(self, X, y=None):
        self.is_fitted_ = True
        return self

    def transform(self, folder_paths):
        if isinstance(folder_paths, str):
            folder_paths = [folder_paths]
        cleaned_texts = []
        for folder in folder_paths:
            for filename in os.listdir(folder):
                if filename.lower().endswith('.pdf'):
                    pdf_path = os.path.join(folder, filename)
                    try:
                        pages_text = self._extract_pages_text(pdf_path)
                        if self.remove_headers_footers:
                            pages_text = self._detect_and_remove_headers_footers(pages_text)
                        full_text = '\n'.join(pages_text)
                        if self.remove_graphics:
                            full_text = self._remove_graphics_text(full_text)
                        cleaned_text = self._clean_text(full_text)
                        if cleaned_text:
                            cleaned_texts.append(cleaned_text)
                    except Exception as e:
                        print(f"Error processing {pdf_path}: {e}")
        print(f"[PDFTextCleaner] Number of cleaned documents: {len(cleaned_texts)}")
        return cleaned_texts

In [10]:
book = "ingestion_source"
clnr = PDFTextCleaner()
text = clnr.transform(book)
# print(text)    # List of all books cleaned
# # OR to see just first book's text:
# if text:
#     print(text[1000:1050])
# else:
#     print("No cleaned texts found.")


[PDFTextCleaner] Number of cleaned documents: 1


In [11]:
class TextChunker(BaseEstimator, TransformerMixin):
    def __init__(self, max_chunk_chars=2000, chunk_overlap=800):
        self.max_chunk_chars = max_chunk_chars
        self.chunk_overlap = chunk_overlap

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        all_chunks = []
        for doc in X:
            start = 0
            doc_len = len(doc)
            while start < doc_len:
                end = min(start + self.max_chunk_chars, doc_len)
                chunk = doc[start:end]
                all_chunks.append(chunk)
                start += self.max_chunk_chars - self.chunk_overlap
        print(f"[TextChunker] Number of chunks created: {len(all_chunks)}")
        return all_chunks

In [15]:
chunker =TextChunker()
text_chunks = chunker.transform(text)
print(f"Number of text chunks: {len(text_chunks)}")
print(text_chunks[150][:500])  # Print first 100 characters of the first chunk for verification

[TextChunker] Number of chunks created: 4115
Number of text chunks: 4115
 an important noninvasive tool for evaluating patients cles is met by increases in heart rate and cardiac output in patients with significant cad the increase in myocardial oxygen demand cannot be met by a proportional increase in coronary blood flow and myocardial ischemia may produce chest pain and characteristic ecg a b trast image courtesy sheldon e litwin md division of cardiology university of utah salt lake city utah v mv la lv a v la b fig 413 transesophageal echocardiogram demonstrates 


In [24]:
class EmbeddingTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, model_name='sentence-transformers/biomed-roberta-base-sapbert-nli'):
        self.model_name = model_name

    def fit(self, X, y=None):
        # No persistent model; always create on transform
        return self

    def _get_device(self):
        import torch
        return 'cuda' if torch.cuda.is_available() else 'cpu'

    def transform(self, X):
        import numpy as np
        from sentence_transformers import SentenceTransformer
        device = self._get_device()
        model = SentenceTransformer(self.model_name, device=device)
        print(f"[EmbeddingTransformer] Using device: {device.upper()}")
        embeddings = model.encode(X, show_progress_bar=False)
        embeddings = np.array(embeddings)
        if np.isnan(embeddings).any() or np.isinf(embeddings).any():
            print("Warning: Embeddings contain NaN or inf!")
        print(f"[EmbeddingTransformer] Number of embeddings created: {len(embeddings)}")
        return (embeddings, X)

In [25]:
embedder = EmbeddingTransformer()
embeddings, texts = embedder.transform(text_chunks)
print(f"Number of embeddings: {len(embeddings)}")
print(embeddings[0][:10])  # Print first 10 values of the first embedding for verification

No sentence-transformers model found with name sentence-transformers/biomed-roberta-base-sapbert-nli. Creating a new one with mean pooling.


OSError: sentence-transformers/biomed-roberta-base-sapbert-nli is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `huggingface-cli login` or by passing `token=<your_token>`