In [None]:
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import torch # noqa
from chromadb.config import Settings  # noqa
from sentence_transformers import SentenceTransformer
import PyPDF2
import os
from langchain_text_splitters import RecursiveCharacterTextSplitter
import chromadb
from config import Config # noqa
from vector_store import VectorStore
import pdfplumber
import logging
import re
import spacy
from typing import Optional
from dataclasses import dataclass
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

In [58]:
# Suppress the specific warning from pdfminer.pdfpage
logging.getLogger("pdfminer.pdfpage").setLevel(logging.ERROR)
# You might also need to suppress pdfminer.layout in some cases
logging.getLogger("pdfminer.layout").setLevel(logging.ERROR)

In [None]:
@dataclass
class Chunk:
    content: str
    start_pos: int
    end_pos: int
    chunk_type: str  # 'sentence', 'paragraph', 'section'
    section: Optional[str] = None
    metadata: dict = None


class SemanticChunker:
    def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
        # Load spaCy model for sentence segmentation
        # Customize spaCy pipeline for efficiency
        self.nlp = spacy.load("en_core_web_sm", exclude=["parser", "ner"])
        self.nlp.enable_pipe("senter")  # Modern sentence boundary detection

        # Load embedding model for semantic similarity
        self.embedding_model = SentenceTransformer(embedding_model)
        
        # Academic paper section patterns
        self.section_patterns = [
            r'^(abstract|introduction|background|related work|methodology|methods|approach|model|architecture|experiments|results|evaluation|discussion|conclusion|references|acknowledgments?)\b',
            r'^\d+\.?\s+(introduction|background|methodology|results|discussion|conclusion)',
            r'^[IVX]+\.?\s+(introduction|background|methodology|results|discussion|conclusion)'
        ]
        
        # Configuration
        self.min_chunk_size = 100
        self.max_chunk_size = 1000
        self.overlap_sentences = 2
        self.similarity_threshold = 0.7
    
    def detect_sections(self, text: str) -> list[tuple[str, int, int]]:
        """Detect academic paper sections and their boundaries."""
        sections = []
        lines = text.split('\n')
        current_section = "introduction"  # default
        section_start = 0
        
        for i, line in enumerate(lines):
            line_clean = line.strip().lower()
            
            # Skip empty lines
            if not line_clean:
                continue
                
            # Check if line matches section pattern
            for pattern in self.section_patterns:
                if re.match(pattern, line_clean):
                    # Close previous section
                    if sections or current_section != "introduction":
                        section_end = sum(len(lines[j]) + 1 for j in range(section_start, i))
                        sections.append((current_section, 
                                       sum(len(lines[j]) + 1 for j in range(section_start)), 
                                       section_end))
                    
                    # Start new section
                    current_section = re.match(pattern, line_clean).group(1)
                    section_start = i
                    break
        
        # Add final section
        if current_section:
            sections.append((current_section, 
                           sum(len(lines[j]) + 1 for j in range(section_start)), 
                           len(text)))
        
        return sections if sections else [("content", 0, len(text))]
    
    def split_into_sentences(self, text: str) -> list[tuple[str, int, int]]:
        """Split text into sentences with position tracking."""
        doc = self.nlp(text)
        sentences = []
        
        for sent in doc.sents:
            # Clean sentence text
            sent_text = sent.text.strip()
            if len(sent_text) > 20:  # Filter out very short sentences
                sentences.append((sent_text, sent.start_char, sent.end_char))
        
        return sentences
    
    def split_into_paragraphs(self, text: str) -> list[tuple[str, int, int]]:
        """Split text into paragraphs."""
        paragraphs = []
        current_pos = 0
        
        # Split by double newlines (paragraph breaks)
        para_texts = re.split(r'\n\s*\n', text)
        
        for para_text in para_texts:
            para_text = para_text.strip()
            if len(para_text) > 50:  # Filter short paragraphs
                start_pos = text.find(para_text, current_pos)
                if start_pos != -1:
                    end_pos = start_pos + len(para_text)
                    paragraphs.append((para_text, start_pos, end_pos))
                    current_pos = end_pos
        
        return paragraphs
    
    def calculate_semantic_similarity(self, texts: list[str]) -> np.ndarray:
        """Calculate semantic similarity matrix for texts."""
        if len(texts) < 2:
            return np.array([[1.0]])
        
        embeddings = self.embedding_model.encode(texts)
        similarity_matrix = cosine_similarity(embeddings)
        return similarity_matrix

    def merge_similar_chunks(self, chunks: list[Chunk]) -> list[Chunk]:
        """Merge semantically similar adjacent chunks."""
        if len(chunks) <= 1:
            return chunks
        
        # Get chunk texts for similarity calculation
        chunk_texts = [chunk.content for chunk in chunks]
        similarity_matrix = self.calculate_semantic_similarity(chunk_texts)
        
        merged_chunks = []
        i = 0
        
        while i < len(chunks):
            current_chunk = chunks[i]
            
            # Check if we can merge with next chunk
            if (i + 1 < len(chunks) and 
                len(current_chunk.content) < self.max_chunk_size and
                similarity_matrix[i][i + 1] > self.similarity_threshold):
                
                next_chunk = chunks[i + 1]
                
                # Merge chunks
                merged_content = current_chunk.content + "\n\n" + next_chunk.content
                if len(merged_content) <= self.max_chunk_size:
                    merged_chunk = Chunk(
                        content=merged_content,
                        start_pos=current_chunk.start_pos,
                        end_pos=next_chunk.end_pos,
                        chunk_type="merged_paragraph",
                        section=current_chunk.section,
                        metadata={
                            "merged_from": [current_chunk.chunk_type, next_chunk.chunk_type],
                            "semantic_similarity": similarity_matrix[i][i + 1]
                        }
                    )
                    merged_chunks.append(merged_chunk)
                    i += 2  # Skip next chunk since it's merged
                    continue
            
            merged_chunks.append(current_chunk)
            i += 1
        
        return merged_chunks
    
    def create_sliding_window_chunks(self,
                                    sentences: list[tuple[str, int, int]],
                                    section: str) -> list[Chunk]:
        """Create overlapping chunks with semantic awareness."""
        chunks = []
        i = 0
        
        while i < len(sentences):
            chunk_sentences = []
            chunk_length = 0
            start_idx = i
            
            # Build chunk up to max size
            while (i < len(sentences) and 
                   chunk_length + len(sentences[i][0]) < self.max_chunk_size):
                chunk_sentences.append(sentences[i])
                chunk_length += len(sentences[i][0])
                i += 1
            
            if chunk_sentences:
                chunk_content = " ".join([sent[0] for sent in chunk_sentences])
                
                chunk = Chunk(
                    content=chunk_content,
                    start_pos=chunk_sentences[0][1],
                    end_pos=chunk_sentences[-1][2],
                    chunk_type="sliding_window",
                    section=section,
                    metadata={
                        "sentence_count": len(chunk_sentences),
                        "start_sentence_idx": start_idx,
                        "end_sentence_idx": i - 1
                    }
                )
                chunks.append(chunk)
                
                # Move back for overlap (but ensure progress)
                overlap_back = min(self.overlap_sentences, len(chunk_sentences) - 1, i - start_idx - 1)
                i = max(start_idx + 1, i - overlap_back)
        
        return chunks
    
    def chunk_text_semantically(self, text: str, source: str) -> list[dict]:
        """Main semantic chunking method."""
        # Step 1: Detect sections
        sections = self.detect_sections(text)
        
        all_chunks = []
        
        for section_name, section_start, section_end in sections:
            section_text = text[section_start:section_end]
            
            # Step 2: Split into paragraphs
            paragraphs = self.split_into_paragraphs(section_text)
            
            # Step 3: Create paragraph-level chunks
            paragraph_chunks = []
            for para_text, para_start, para_end in paragraphs:
                if len(para_text) >= self.min_chunk_size:
                    chunk = Chunk(
                        content=para_text,
                        start_pos=section_start + para_start,
                        end_pos=section_start + para_end,
                        chunk_type="paragraph",
                        section=section_name,
                        metadata={"paragraph_length": len(para_text)}
                    )
                    paragraph_chunks.append(chunk)
                elif len(para_text) > 20:  # Small paragraphs - will be merged
                    chunk = Chunk(
                        content=para_text,
                        start_pos=section_start + para_start,
                        end_pos=section_start + para_end,
                        chunk_type="small_paragraph",
                        section=section_name,
                        metadata={"paragraph_length": len(para_text)}
                    )
                    paragraph_chunks.append(chunk)
            
            # Step 4: Merge small paragraphs
            paragraph_chunks = self.merge_similar_chunks(paragraph_chunks)
            
            # Step 5: Handle large paragraphs with sliding window
            section_chunks = []
            for chunk in paragraph_chunks:
                if len(chunk.content) > self.max_chunk_size:
                    # Split large paragraph into sentences and use sliding window
                    sentences = self.split_into_sentences(chunk.content)
                    sliding_chunks = self.create_sliding_window_chunks(sentences, section_name)
                    section_chunks.extend(sliding_chunks)
                else:
                    section_chunks.append(chunk)
            
            all_chunks.extend(section_chunks)
        
        # Step 6: Create hierarchical chunks (section-level)
        hierarchical_chunks = []
        for section_name, section_start, section_end in sections:
            section_text = text[section_start:section_end]
            if len(section_text) > self.min_chunk_size:
                section_chunk = Chunk(
                    content=section_text,
                    start_pos=section_start,
                    end_pos=section_end,
                    chunk_type="section",
                    section=section_name,
                    metadata={
                        "section_length": len(section_text),
                        "is_hierarchical": True
                    }
                )
                hierarchical_chunks.append(section_chunk)
        
        # Combine all chunks
        all_chunks.extend(hierarchical_chunks)
        
        # Step 7: Convert to output format
        documents = []
        for i, chunk in enumerate(all_chunks):
            documents.append({
                "content": chunk.content,
                "metadata": {
                    "source": source,
                    "chunk_id": i,
                    "chunk_type": chunk.chunk_type,
                    "section": chunk.section,
                    "start_pos": chunk.start_pos,
                    "end_pos": chunk.end_pos,
                    "total_chunks": len(all_chunks),
                    **(chunk.metadata or {})
                }
            })
        
        return documents


# Example usage and configuration
class Config:
    # Semantic chunking configuration
    MIN_CHUNK_SIZE = 100
    MAX_CHUNK_SIZE = 800
    OVERLAP_SENTENCES = 2
    SIMILARITY_THRESHOLD = 0.7
    
    # Embedding model for semantic analysis
    EMBEDDING_MODEL = "all-MiniLM-L6-v2"  # or "allenai-specter" for scientific papers
    
    # Original config
    CHROMA_PERSIST_DIRECTORY = "./chroma_db"
    TOP_K_RETRIEVAL = 5

In [None]:
class SemanticDocumentProcessor:
    def __init__(self):
        """Initialize the document processor with semantic chunking."""
        self.semantic_chunker = SemanticChunker()
    
    def extract_text_from_pdf(self, pdf_path: str) -> str:
        """Extract text from PDF file using pdfplumber."""
        text = ""
        try:
            with pdfplumber.open(pdf_path) as pdf:
                for page in pdf.pages:
                    page_text = page.extract_text(x_tolerance=2, use_text_flow=True)
                    if page_text:
                        text += page_text + "\n"
            return text.strip()
        except Exception as e:
            raise Exception(f"Error reading PDF {pdf_path}: {str(e)}")
    
    def process_pdf(self, pdf_path: str) -> list[dict]:
        """
        Complete PDF processing pipeline with semantic chunking.
        """
        filename = os.path.basename(pdf_path)
        print(f"Processing {filename} with semantic chunking...")
        
        text = self.extract_text_from_pdf(pdf_path)
        if not text:
            print(f"No text extracted from {filename}.")
            return []
        
        # Use semantic chunking instead of fixed-size chunking
        documents = self.semantic_chunker.chunk_text_semantically(text, filename)
        
        print(f"Processed {filename} into {len(documents)} semantic chunks.")
        print(f"Chunk types: {set(doc['metadata']['chunk_type'] for doc in documents)}")
        
        return documents

In [62]:
class VectorStore:
    def __init__(self):
        # Initialize Chroma client
        self.client = chromadb.PersistentClient(path=Config.CHROMA_PERSIST_DIRECTORY)

        # Initialize embedding model
        self.embedding_model = SentenceTransformer(Config.EMBEDDING_MODEL)

        # Get or create collection
        self.collection = self.client.get_or_create_collection(
            name="academic_papers",
            metadata={"description": "Academic research papers collection"}
        )

    def add_documents(self, documents: list[dict]) -> None:
        """Add documents to vector store"""
        texts = [doc["content"] for doc in documents]
        metadatas = [doc["metadata"] for doc in documents]

        # Generate embeddings
        embeddings = self.embedding_model.encode(texts).tolist()

        # Generate IDs
        ids = [f"{doc['metadata']['source']}_{doc['metadata']['chunk_id']}" 
               for doc in documents]

        # Add to collection
        self.collection.add(
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas,
            ids=ids
        )

    def search(self, query: str, top_k: int = Config.TOP_K_RETRIEVAL) -> list[dict]:
        """Search for relevant documents"""
        # Generate query embedding
        query_embedding = self.embedding_model.encode([query]).tolist()

        # Search
        results = self.collection.query(
            query_embeddings=query_embedding,
            n_results=top_k,
            include=["documents", "metadatas", "distances"]
        )

        # Format results
        formatted_results = []
        if results["documents"]:
            for i in range(len(results["documents"][0])):
                formatted_results.append({
                    "content": results["documents"][0][i],
                    "metadata": results["metadatas"][0][i],
                    "similarity_score": 1 - results["distances"][0][i]  # Convert distance to similarity
                })

        return formatted_results

    def get_collection_stats(self) -> dict:
        """Get statistics about the collection"""
        count = self.collection.count()
        return {
            "total_documents": count,
            "embedding_model": Config.EMBEDDING_MODEL
        }


In [None]:
class RAGPipeline:
    def __init__(self):
        self.vector_store = VectorStore()
        self.PROMPT_TEMPLATE = """
        You're an AI research assistant analyzing academic papers. Use the context to answer scientifically.

        **STRUCTURED CONTEXT:**
        {context}

        **ANALYSIS TASK:**
        1. Identify key claims relevant to: "{query}"
        2. Cross-reference across sources
        3. Generate evidence-based response
        4. Cite sources [1-3] where applicable

        **RESPONSE:**
        """
        # Initialize free local model
        print("Loading language model... (this may take a few minutes first time)")

        # Use a smaller, efficient model that runs locally
        model_name =  "google/flan-t5-base" # "google/flan-t5-base" "microsoft/DialoGPT-medium"

        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(model_name, use_safetensors=True)

            # Set up the pipeline
            self.generator = pipeline(
                "text-generation",
                model=self.model,
                tokenizer=self.tokenizer,
                max_length=512,
                temperature=0.3,
                pad_token_id=self.tokenizer.eos_token_id
            )

            print("✅ Language model loaded successfully!")

        except Exception as e:
            print(f"❌ Error loading model: {e}")
            # Fallback to a simpler approach
            self.generator = None

    def format_context(docs: list[dict]) -> str:
        context_lines = []
        for i, doc in enumerate(docs):
            metadata = doc['metadata']
            
            # PRESERVE STRUCTURE
            header = f"SOURCE {i+1} | {metadata['source']}"
            section = f"SECTION: {metadata['section']}" if 'section' in metadata else ""
            chunk_type = f"CHUNK TYPE: {metadata['chunk_type']}"
            
            # SMART TRUNCATION (preserve sentence boundaries)
            content = doc['content']
            if len(content) > 400:
                last_period = content[:400].rfind('.')
                content = content[:last_period+1] + " [...]"
            
            context_lines.append(f"{header}\n{section}\n{chunk_type}\n{content}\n")
        
        return "\n\n".join(context_lines)


    def postprocess(self, raw_response, context_docs):
        # Extract answer
        answer = raw_response[0]['generated_text'].split("RESPONSE:")[-1].strip()
        
        # Extract citations
        source_ids = set()
        for i in range(len(context_docs)):
            if f"[{i+1}]" in answer:
                source_ids.add(i)
        
        # Verify citations
        valid_sources = [d['metadata']['source'] for i,d in enumerate(context_docs) if i in source_ids]
        
        return {
            "answer": answer,
            "sources": valid_sources,
            "context_used": len(context_docs)
        }


    def generate_response(self, query: str, context_docs: list[dict]) -> dict:
        """Generate response using retrieved context"""

        if not self.generator:
            return {
                "answer": "Language model not available. Please check your setup.",
                "sources": [],
                "context_used": 0
            }

        # Prepare context (keep it shorter for local models)
        # STRUCTURED context formatting
        context = self.format_context(context_docs[:3])  # Use top 3 most relevant

        # DYNAMIC prompt construction
        prompt = self.PROMPT_TEMPLATE.format(context=context, query=query)

        try:
            # GENERATE with precision control
            response = self.generator(
                prompt,
                max_new_tokens=350,
                temperature=0.2,  # Lower for factual accuracy
                repetition_penalty=1.2,
                num_beams=5,      # Better than greedy search
                do_sample=False
            )

            # Extract the generated text
            generated_text = response[0]['generated_text']

            # Get just the answer part (after "Answer:")
            if "Answer:" in generated_text:
                answer = generated_text.split("Answer:")[-1].strip()
            else:
                answer = generated_text[len(prompt):].strip()

            return {
                "answer": answer,
                "sources": [doc['metadata']['source'] for doc in context_docs],
                "context_used": len(context_docs)
            }

        except Exception as e:
            return {
                "answer": f"Error generating response: {str(e)}",
                "sources": [],
                "context_used": 0
            }

    def query(self, question: str) -> dict:
        """Complete RAG pipeline"""
        # Retrieve relevant documents
        relevant_docs = self.vector_store.search(question)

        if not relevant_docs:
            return {
                "answer": "No relevant documents found in the database.",
                "sources": [],
                "context_used": 0
            }

        # Generate response
        response = self.generate_response(question, relevant_docs)

        return response

In [64]:
processor = SemanticDocumentProcessor()

file_path = "/Users/dominikklingshirn/Projects/academic_rag/data/raw/BERT_pre-training_deep_bidirectional_transformers.pdf"

In [65]:
docs = processor.process_pdf(file_path)

Processing BERT_pre-training_deep_bidirectional_transformers.pdf with semantic chunking...
Processed BERT_pre-training_deep_bidirectional_transformers.pdf into 25 semantic chunks.
Chunk types: {'sliding_window', 'paragraph', 'section'}


In [76]:
for i in range(len(docs)):
        for segment in docs[i]['content'].split('\n'):
            print(segment)
        print('\n')
        for key, value in docs[i]['metadata'].items():
            print(f"{key}: {value}")
        print('\n','-'*20,'\n')

Abstract
We introduce a new language representa-
tion model called BERT, which stands for
Bidirectional Encoder Representations from
Transformers. Unlike recent language repre-
sentation models (Peters et al., 2018a; Rad-
ford et al., 2018), BERT is designed to pre-
train deep bidirectional representations from
unlabeled text by jointly conditioning on both
left and right context in all layers. As a re-
sult, the pre-trained BERT model can be fine-
tuned with just one additional output layer
to create state-of-the-art models for a wide
range of tasks, such as question answering and
language inference, without substantial task-
specific architecture modifications.
BERT is conceptually simple and empirically
powerful. It obtains new state-of-the-art re-
sults on eleven natural language processing
tasks, including pushing the GLUE score to
80.5% (7.7% point absolute impro


source: BERT_pre-training_deep_bidirectional_transformers.pdf
chunk_id: 0
chunk_type: paragraph
section: abstract
st