## DOCUMENT PROCESSING

In [2]:
"""
Full hybrid PDF extraction pipeline:
- PDF operator analysis (text vs images)
- PyPDF extraction (fast)
- Tesseract OCR (medium)
- Docling (slow, high-quality) or optional transformer-based OCR fallback
- Composite quality scorer to decide escalation

Example run at bottom
"""

import re
import io
import math
import tempfile
import pytesseract
import concurrent.futures
from pypdf import PdfReader
from functools import lru_cache
from typing import Tuple, Dict, List
from pdf2image import convert_from_path
from concurrent.futures import ThreadPoolExecutor


# Optional imports
try:
    import docling
    HAS_DOCLING = True
except Exception:
    HAS_DOCLING = False

try:
    from wordfreq import zipf_frequency
    HAS_WORD_FREQ = True
except Exception:
    HAS_WORD_FREQ = False

try:
    from langdetect import detect_langs
    HAS_LANGDETECT = True
except Exception:
    HAS_LANGDETECT = False

# Optional perplexity using transformers (slow)
try:
    from transformers import AutoTokenizer, AutoModelForCausalLM
    import torch
    HAS_TRANSFORMERS = True
except Exception:
    HAS_TRANSFORMERS = False


#########################
# 1) PDF structure analysis
#########################
def analyze_pdf_ops(filepath: str) -> List[Tuple[int, int, int]]:
    """
    Return a list of tuples (page_number, text_ops, image_ops)
    """
    reader = PdfReader(filepath)
    ops_summary = []

    for i, page in enumerate(reader.pages):
        # Get raw content stream(s)
        contents = page.get_contents()
        stream_data = b""
        if contents:
            # pypdf may return a single object or a list
            if isinstance(contents, list):
                for c in contents:
                    try:
                        stream_data += c.get_data()
                    except Exception:
                        pass
            else:
                try:
                    stream_data = contents.get_data()
                except Exception:
                    stream_data = b""

        # count common text operators and image "Do" operator
        text_ops = (
            stream_data.count(b"Tj")
            + stream_data.count(b"TJ")
            + stream_data.count(b"Tf")
            + stream_data.count(b"Td")
            + stream_data.count(b"TD")
            + stream_data.count(b"Tm")
            + stream_data.count(b"T*")
        )
        image_ops = stream_data.count(b"Do")
        ops_summary.append((i + 1, text_ops, image_ops))

    return ops_summary


#########################
# 2) Extract using PyPDF (born-digital)
#########################
def extract_text_pypdf(filepath: str, page_index: int) -> str:
    reader = PdfReader(filepath)
    page = reader.pages[page_index]
    text = page.extract_text() or ""
    return text.strip()


#########################
# 3) OCR functions
#########################
def ocr_tesseract_image(image) -> str:
    """Run pytesseract on a PIL image object"""
    return pytesseract.image_to_string(image)


def ocr_docling_image(image) -> str:
    """Run docling (or fallback) on a PIL image object.
    This function expects docling to be installed & configured.
    If not available, returns None.
    """
    if not HAS_DOCLING:
        raise RuntimeError("Docling is not available in this environment.")
    # Docling usage will depend on its API — below is a placeholder example.
    # Replace with the actual docling invocation in your environment.
    # Example (pseudocode):
    # result = docling.ocr_image(image)
    # return result["text"]
    return docling.ocr_image_to_text(image)


#########################
# 4) Quality evaluation
#########################
RE_BAD_CHAR = re.compile(r"[^\x00-\x7F\u00A0-\u017F]")  # many non-ascii/unusual chars
RE_WORD = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿ'’-]{2,}")  # words with accents, hyphens

def garbage_ratio(text: str) -> float:
    if not text:
        return 1.0
    total = len(text)
    bad = len(RE_BAD_CHAR.findall(text))
    return bad / max(1, total)


def word_dictionary_fraction(text: str) -> float:
    """
    A heuristic: fraction of words that appear to be real based on wordfreq zipf frequency.
    If wordfreq not available, fall back to a simple heuristic (letters-only words / total words).
    """
    words = RE_WORD.findall(text)
    if not words:
        return 0.0

    if HAS_WORD_FREQ:
        valid = 0
        for w in words:
            # zipf_frequency returns a score -4..7; treat > 3.0 as likely real/common word
            if zipf_frequency(w.lower(), "en") > 1.5:
                valid += 1
        return valid / len(words)
    else:
        # fallback: count words that have >2 letters and at least one vowel
        def looks_real(w):
            return (len(w) > 2) and (re.search(r"[aeiouyAEIOUYàèéùôî]", w) is not None)
        valid = sum(1 for w in words if looks_real(w))
        return valid / len(words)


def detect_primary_language_confidence(text: str) -> float:
    """
    Returns a confidence-like score (0..1) that language detection is stable / strong.
    Uses langdetect if available.
    """
    if not text or not HAS_LANGDETECT:
        return 0.0

    try:
        langs = detect_langs(text)
        if not langs:
            return 0.0
        top = langs[0]
        # top.prob (0..1)
        return float(top.prob)
    except Exception:
        return 0.0


def perplexity_score(text: str, model_name="gpt2") -> float:
    """
    Optional: compute (normalized) perplexity using a small transformer model.
    Returns a score in range (0..1) where 1 means low perplexity (good), 0 means high perplexity (bad).
    If transformers not available, return 0.5 (neutral).
    """
    if not HAS_TRANSFORMERS or not text:
        return 0.5

    # Keep short for speed
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(model_name)
        model.eval()
        if torch.cuda.is_available():
            model.to("cuda")

        enc = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        if torch.cuda.is_available():
            enc = {k: v.to("cuda") for k, v in enc.items()}

        with torch.no_grad():
            outputs = model(**enc, labels=enc["input_ids"])
            # average negative log-likelihood per token
            loss = outputs.loss.item()
            ppl = math.exp(loss)
            # normalize: for our use, convert to 0..1 with a stabilizing mapping
            # lower ppl -> better -> produce higher score
            score = 1.0 / (1.0 + math.log(1.0 + ppl))
            return float(score)
    except Exception:
        return 0.5


def evaluate_text_quality(text: str) -> Dict[str, float]:
    """
    Return a dict with component scores and a combined 'score' (0..1).
    Higher is better.
    """
    if not text:
        return {
            "garbage_ratio": 1.0,
            "dictionary_fraction": 0.0,
            "lang_conf": 0.0,
            "perplexity_score": 0.5,
            "score": 0.0,
        }

    g = garbage_ratio(text)  # lower better
    d = word_dictionary_fraction(text)  # higher better
    l = detect_primary_language_confidence(text)  # higher better
    p = perplexity_score(text)  # higher better

    # normalize garbage to 0..1 where 1 = perfect (no garbage)
    garbage_ok = max(0.0, 1.0 - g)

    # Compose weighted score
    score = (0.35 * d) + (0.25 * garbage_ok) + (0.25 * p) + (0.15 * l)
    score = max(0.0, min(1.0, score))

    return {
        "garbage_ratio": g,
        "dictionary_fraction": d,
        "lang_conf": l,
        "perplexity_score": p,
        "score": score,
    }


#########################
# 5) Page-level processing logic
#########################
def process_page(filepath: str, page_index: int, dpi: int = 300,
                 min_text_ops_for_born_digital: int = 10,
                 tesseract_threshold: float = 0.45,
                 docling_threshold: float = 0.30) -> Dict:
    """
    Process a single page:
      - If operator analysis shows many text ops -> use PyPDF text
      - Else convert page to image, run Tesseract, score it
      - If score low -> run Docling (or slow fallback)
    Returns dict with keys: 'final_text', 'extraction_method', 'scores', 'pageno'
    """
    # 1) operator-level check
    reader = PdfReader(filepath)
    page = reader.pages[page_index]
    contents = page.get_contents()
    stream_data = b""
    if contents:
        if isinstance(contents, list):
            for c in contents:
                try:
                    stream_data += c.get_data()
                except Exception:
                    pass
        else:
            try:
                stream_data = contents.get_data()
            except Exception:
                stream_data = b""
    text_ops = (
        stream_data.count(b"Tj")
        + stream_data.count(b"TJ")
        + stream_data.count(b"Tf")
        + stream_data.count(b"Td")
        + stream_data.count(b"TD")
        + stream_data.count(b"Tm")
        + stream_data.count(b"T*")
    )
    image_ops = stream_data.count(b"Do")

    # 2) If many text ops -> born-digital
    if text_ops >= min_text_ops_for_born_digital and text_ops > image_ops:
        extracted = page.extract_text() or ""
        scores = evaluate_text_quality(extracted)
        return {
            "pageno": page_index + 1,
            "final_text": extracted,
            "extraction_method": "pypdf-born-digital",
            "scores": scores,
            "text_ops": text_ops,
            "image_ops": image_ops,
        }

    # 3) Otherwise, produce page image and OCR with Tesseract
    images = convert_from_path(filepath, first_page=page_index + 1, last_page=page_index + 1, dpi=dpi)
    if not images:
        return {
            "pageno": page_index + 1,
            "final_text": "",
            "extraction_method": "empty-page",
            "scores": {"score": 0.0},
            "text_ops": text_ops,
            "image_ops": image_ops,
        }
    img = images[0]

    # If there is some extractable text (eg header), keep it to combine later
    pypdf_text = page.extract_text() or ""

    # Tesseract (fast)
    tesseract_text = ocr_tesseract_image(img)
    t_scores = evaluate_text_quality(tesseract_text)

    # If Tesseract quality is acceptable, use it
    if t_scores["score"] >= tesseract_threshold:
        # combine with pypdf small header if it exists and is not redundant
        combined = (pypdf_text + "\n" + tesseract_text).strip()
        return {
            "pageno": page_index + 1,
            "final_text": combined,
            "extraction_method": "tesseract",
            "scores": t_scores,
            "text_ops": text_ops,
            "image_ops": image_ops,
        }

    # 4) Escalate to Docling / high-quality OCR
    # Try docling if available
    slow_text = None
    if HAS_DOCLING:
        try:
            slow_text = ocr_docling_image(img)
        except Exception as e:
            slow_text = None

    # If docling not available, try transformer-based model (if present)
    if slow_text is None and HAS_TRANSFORMERS:
        try:
            # Example: apply a transformer OCR or vision model - pseudo
            # NOTE: Implement actual model-specific call here if you set it up.
            # For illustration only; many vision models take images differently.
            from PIL import Image
            # here we'd call a model to get better OCR - placeholder:
            slow_text = ocr_tesseract_image(img)  # fallback if no better model available
        except Exception:
            slow_text = None

    # Evaluate slow_text if we obtained one
    if slow_text:
        s_scores = evaluate_text_quality(slow_text)
        # If slow_text improved score, return it; else keep tesseract
        if s_scores["score"] >= t_scores["score"] or s_scores["score"] >= docling_threshold:
            combined = (pypdf_text + "\n" + slow_text).strip()
            return {
                "pageno": page_index + 1,
                "final_text": combined,
                "extraction_method": "docling" if HAS_DOCLING else "transformer_slow",
                "scores": s_scores,
                "text_ops": text_ops,
                "image_ops": image_ops,
            }

    # 5) fallback: return Tesseract result (best we have)
    return {
        "pageno": page_index + 1,
        "final_text": (pypdf_text + "\n" + tesseract_text).strip(),
        "extraction_method": "tesseract_fallback",
        "scores": t_scores,
        "text_ops": text_ops,
        "image_ops": image_ops,
    }


#########################
# 6) Process entire PDF
#########################

def process_pdf(filepath: str, dpi: int = 150, max_pages: int = None, 
               max_workers: int = 4, fast_mode: bool = True) -> Dict:
    """
    Optimized PDF processing with parallel execution and batch operations.
    """
    reader = PdfReader(filepath)
    n_pages = len(reader.pages)
    pages_to_process = range(n_pages) if max_pages is None else range(min(n_pages, max_pages))

    # FAST PATH: For large PDFs, use simplified processing
    if n_pages > 50 and fast_mode:
        return process_large_pdf_fast(filepath, dpi, max_pages, max_workers)

    # Standard processing for smaller PDFs
    needs_ocr_per_page = analyze_pdf_needs_ocr(filepath, n_pages)
    ocr_page_indices = [i for i, needs_ocr in enumerate(needs_ocr_per_page) if needs_ocr]
    all_images = []
    
    if ocr_page_indices:
        all_images = convert_from_path(
            filepath, 
            dpi=dpi,
            grayscale=True,
            thread_count=4,
            first_page=1,
            last_page=n_pages
        )
    
    image_map = {}
    if all_images:
        for i in ocr_page_indices:
            if i < len(all_images):
                image_map[i] = all_images[i]

    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_page = {
            executor.submit(
                process_page_optimized, 
                filepath, 
                i, 
                dpi, 
                image_map.get(i),
                fast_mode
            ): i 
            for i in pages_to_process
        }
        
        for future in concurrent.futures.as_completed(future_to_page):
            page_index = future_to_page[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"Page {page_index + 1} failed: {e}")
                results.append(create_fallback_result(page_index))

    results.sort(key=lambda x: x["pageno"])
    
    summary = {
        "total_pages": n_pages,
        "processed_pages": len(results),
        "born_digital_pages": sum(1 for p in results if p["extraction_method"].startswith("pypdf")),
        "tesseract_pages": sum(1 for p in results if p["extraction_method"].startswith("tesseract")),
        "docling_pages": sum(1 for p in results if p["extraction_method"].startswith("docling")),
        "error_pages": sum(1 for p in results if p["extraction_method"] == "error"),
    }

    return {"pages": results, "summary": summary}

def process_large_pdf_fast(filepath: str, dpi: int = 150, max_pages: int = None, 
                          max_workers: int = 4) -> Dict:
    """
    Ultra-fast processing for large PDFs (>50 pages)
    """
    reader = PdfReader(filepath)
    n_pages = len(reader.pages)
    pages_to_process = range(n_pages) if max_pages is None else range(min(n_pages, max_pages))

    # Quick operator analysis
    operator_summary = analyze_pdf_ops_fast(filepath)
    
    # Batch convert all images at once
    all_images = convert_from_path(
        filepath, 
        dpi=dpi,
        grayscale=True,
        thread_count=4
    )

    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_page = {
            executor.submit(
                process_page_fast, 
                filepath, 
                i, 
                operator_summary[i] if i < len(operator_summary) else (i+1, 0, 0),
                all_images[i] if i < len(all_images) else None
            ): i 
            for i in pages_to_process
        }
        
        for future in concurrent.futures.as_completed(future_to_page):
            page_index = future_to_page[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"Page {page_index + 1} failed: {e}")
                results.append(create_fallback_result(page_index))

    results.sort(key=lambda x: x["pageno"])
    
    summary = {
        "total_pages": n_pages,
        "processed_pages": len(results),
        "born_digital_pages": sum(1 for p in results if "pypdf" in p["extraction_method"]),
        "tesseract_pages": sum(1 for p in results if "tesseract" in p["extraction_method"]),
        "mode": "fast_large_pdf",
    }

    return {"pages": results, "summary": summary}

def process_page_fast(filepath: str, page_index: int, operator_info: Tuple[int, int, int], 
                     pre_converted_image=None) -> Dict:
    """
    Ultra-fast page processing for large PDFs
    """
    reader = PdfReader(filepath)
    page = reader.pages[page_index]
    page_num, text_ops, image_ops = operator_info
    
    # 1) Try PyPDF first (fastest)
    pypdf_text = page.extract_text() or ""
    
    # Simple heuristic: if we found reasonable text, use it
    if len(pypdf_text.strip()) > 100 or text_ops > 5:
        return {
            "pageno": page_index + 1,
            "final_text": pypdf_text,
            "extraction_method": "pypdf-born-digital",
            "scores": {"score": 0.8},
            "text_ops": text_ops,
            "image_ops": image_ops,
        }
    
    # 2) OCR path - only if we have pre-converted image
    if pre_converted_image is not None:
        tesseract_text = ocr_tesseract_image(pre_converted_image)
        combined_text = (pypdf_text + "\n" + tesseract_text).strip()
        
        return {
            "pageno": page_index + 1,
            "final_text": combined_text,
            "extraction_method": "tesseract_fast",
            "scores": {"score": 0.5},
            "text_ops": text_ops,
            "image_ops": image_ops,
        }
    
    # 3) Fallback: minimal text extraction only
    return {
        "pageno": page_index + 1,
        "final_text": pypdf_text,
        "extraction_method": "pypdf_fallback",
        "scores": {"score": 0.3},
        "text_ops": text_ops,
        "image_ops": image_ops,
    }

def process_page_optimized(filepath: str, page_index: int, dpi: int = 150, 
                          pre_converted_image=None, fast_mode: bool = True) -> Dict:
    """
    Optimized page processing using pre-converted images
    """
    reader = PdfReader(filepath)
    page = reader.pages[page_index]
    
    # 1) Try PyPDF first
    pypdf_text = page.extract_text() or ""
    
    if len(pypdf_text.strip()) > 200:
        scores = evaluate_text_quality_fast(pypdf_text) if fast_mode else evaluate_text_quality(pypdf_text)
        return {
            "pageno": page_index + 1,
            "final_text": pypdf_text,
            "extraction_method": "pypdf-born-digital",
            "scores": scores,
            "text_ops": 1,
            "image_ops": 0,
        }
    
    # 2) OCR path
    if pre_converted_image:
        img = pre_converted_image
    else:
        images = convert_from_path(
            filepath, 
            first_page=page_index + 1, 
            last_page=page_index + 1, 
            dpi=dpi,
            grayscale=True
        )
        img = images[0] if images else None
    
    if not img:
        return create_fallback_result(page_index)
    
    tesseract_text = ocr_tesseract_image(img)
    combined_text = (pypdf_text + "\n" + tesseract_text).strip()
    
    if fast_mode:
        scores = evaluate_text_quality_fast(combined_text)
        return {
            "pageno": page_index + 1,
            "final_text": combined_text,
            "extraction_method": "tesseract_fast",
            "scores": scores,
            "text_ops": 0,
            "image_ops": 1,
        }
    else:
        scores = evaluate_text_quality(combined_text)
        return {
            "pageno": page_index + 1,
            "final_text": combined_text,
            "extraction_method": "tesseract",
            "scores": scores,
            "text_ops": 0,
            "image_ops": 1,
        }

def analyze_pdf_needs_ocr(filepath: str, n_pages: int) -> List[bool]:
    """Quick analysis to determine which pages need OCR"""
    reader = PdfReader(filepath)
    needs_ocr = []
    
    for i in range(min(n_pages, 1000)):
        try:
            page = reader.pages[i]
            text = page.extract_text() or ""
            needs_ocr.append(len(text.strip()) < 100)
        except:
            needs_ocr.append(True)
    
    return needs_ocr

def analyze_pdf_ops_fast(filepath: str) -> List[Tuple[int, int, int]]:
    """Faster operator analysis"""
    reader = PdfReader(filepath)
    ops_summary = []
    
    for i, page in enumerate(reader.pages):
        try:
            text = page.extract_text() or ""
            text_ops = 1 if len(text.strip()) > 50 else 0
            
            contents = page.get_contents()
            image_ops = 0
            if contents:
                stream_data = b""
                if isinstance(contents, list):
                    for c in contents[:2]:
                        try:
                            stream_data += c.get_data()[:1000]
                        except:
                            pass
                else:
                    try:
                        stream_data = contents.get_data()[:1000]
                    except:
                        pass
                image_ops = stream_data.count(b"Do")
            
            ops_summary.append((i + 1, text_ops, image_ops))
        except:
            ops_summary.append((i + 1, 0, 1))
    
    return ops_summary

def evaluate_text_quality_fast(text: str) -> Dict[str, float]:
    """Fast quality assessment"""
    if not text:
        return {"score": 0.0}
    
    total_chars = len(text)
    if total_chars == 0:
        return {"score": 0.0}
        
    non_ascii_count = len([c for c in text if ord(c) > 127])
    garbage_ratio = non_ascii_count / total_chars
    
    words = text.split()
    word_count = len(words)
    
    garbage_score = max(0.0, 1.0 - garbage_ratio)
    word_score = min(1.0, word_count / 100)
    
    score = (0.6 * word_score) + (0.4 * garbage_score)
    
    return {
        "score": score,
        "garbage_ratio": garbage_ratio,
        "word_count": word_count,
    }

def create_fallback_result(page_index: int) -> Dict:
    """Create a result dict for failed pages"""
    return {
        "pageno": page_index + 1,
        "final_text": "",
        "extraction_method": "error",
        "scores": {"score": 0.0},
        "text_ops": 0,
        "image_ops": 0,
    }

## EXPERIMENT ON DOCUMENT

In [3]:
input_pdf = "sample/Le-code-du-travail-ivoirien-2023.pdf"

In [4]:
# For your 500-page PDF - this should be 10x faster
out = process_pdf(
    input_pdf, 
    dpi=150, 
    max_workers=10,  # Adjust based on your CPU
    fast_mode=True   # Critical for large PDFs
)

## SEMANTIC CHUNKING

In [5]:
from langchain_experimental.text_splitter import SemanticChunker
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.documents import Document

# Initialize the embedding model
embedding_model = HuggingFaceEmbeddings(
    model_name="google/embeddinggemma-300m",
    model_kwargs={'device': 'cpu', "trust_remote_code": True},  # or 'cuda' if you have GPU
    encode_kwargs={'normalize_embeddings': True}
)

# Create the semantic chunker
text_splitter = SemanticChunker(
    embeddings=embedding_model,
    breakpoint_threshold_type="percentile",  # or "standard_deviation", "interquartile"
    breakpoint_threshold_amount=95,  # adjust based on your needs
)

'(MaxRetryError('HTTPSConnectionPool(host=\'huggingface.co\', port=443): Max retries exceeded with url: /google/embeddinggemma-300m/resolve/main/modules.json (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x000001F09D522B10>: Failed to resolve \'huggingface.co\' ([Errno 11001] getaddrinfo failed)"))'), '(Request ID: 5267559a-cbb0-4745-8a1d-e37d4dc471e4)')' thrown while requesting HEAD https://huggingface.co/google/embeddinggemma-300m/resolve/main/./modules.json
Retrying in 1s [Retry 1/5].
'(MaxRetryError('HTTPSConnectionPool(host=\'huggingface.co\', port=443): Max retries exceeded with url: /google/embeddinggemma-300m/resolve/main/modules.json (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x000001F09BD49FA0>: Failed to resolve \'huggingface.co\' ([Errno 11001] getaddrinfo failed)"))'), '(Request ID: f6ac4aff-f76f-4999-8b8c-eb6f7ebb7855)')' thrown while requesting HEAD https://huggingface.co/google/embeddinggemma-300m/resolv

In [6]:
# Replace your chunking logic with this to preserve page numbers
documents = []

for page in out["pages"]:
    # Create a doc for the specific page
    page_doc = [Document(
        page_content=page["final_text"], 
        metadata={"pageno": page["pageno"], "source": input_pdf}
    )]
    
    page_chunks = text_splitter.split_documents(page_doc)
    documents.extend(page_chunks)

print(f"Created {len(documents)} chunks with metadata.")

Created 1002 chunks with metadata.


## EMBEDDING STRATEGY

In [7]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_huggingface import HuggingFaceEmbeddings

def init_opensource_embedding(model_name = "google/embeddinggemma-300m"):
    embeddings = HuggingFaceEmbeddings(
            model_name=model_name, # Choose a model from HuggingFace
            model_kwargs={'device': 'cpu', "trust_remote_code": True},  # or 'cuda' if you have GPU
            encode_kwargs={'normalize_embeddings': True}
        )
    print("HuggingFace embedding model initialized.")
    return embeddings

In [9]:
# Example usage:
embeddings = init_opensource_embedding()

HuggingFace embedding model initialized.


## VECTOR DATABASE

In [10]:
from qdrant_client import QdrantClient
from langchain_qdrant import QdrantVectorStore
from qdrant_client.http.models import Distance, VectorParams

client = QdrantClient(url="http://localhost:6333")

In [11]:
from qdrant_client import QdrantClient, models
from qdrant_client.http.exceptions import UnexpectedResponse
from langchain_qdrant import QdrantVectorStore
from langchain_qdrant import FastEmbedSparse, QdrantVectorStore, RetrievalMode
from qdrant_client.http.models import Distance, SparseVectorParams, VectorParams

sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")


def init_qdrant_w_gemma(collection_name, url="http://localhost:6333",vector_size=768, distance_metric=models.Distance.COSINE):
    try:
        client.create_collection(
            collection_name=collection_name,
            vectors_config={"dense": VectorParams(size=vector_size, distance=distance_metric, on_disk=True)},
            sparse_vectors_config={
                "sparse": SparseVectorParams(index=models.SparseIndexParams(on_disk=False))
            },
            quantization_config={
                "scalar": {
                    "type": "int8",  # 4x smaller
                    "quantile": 0.99
                }
            }
        )
        vector_store = QdrantVectorStore(
            client=client,
            collection_name=collection_name,
            embedding=embeddings,
            sparse_embedding=sparse_embeddings,
            retrieval_mode=RetrievalMode.HYBRID,
            vector_name="dense",
            sparse_vector_name="sparse",
        )
        status =  {
            "status": "success",
            "msg": f"collection {collection_name} created successfully"
        }
        print(status)
        return vector_store
    
    except UnexpectedResponse as e:
        if "already exists" in str(e):
            status = {
            "status": "success",
            "msg": f"Collection '{collection_name}' already exists."
            }
            vector_store = QdrantVectorStore.from_existing_collection(
                embedding=embeddings,
                collection_name=collection_name,
                url=url,
                sparse_embedding=sparse_embeddings,
                retrieval_mode=RetrievalMode.HYBRID,
                vector_name="dense",
                sparse_vector_name="sparse"
            )
            print(status)
            return vector_store
        else:
            raise Exception("error initializing vectorstore")


In [12]:
vectorstore = init_qdrant_w_gemma("labor_law")

{'status': 'success', 'msg': 'collection labor_law created successfully'}


In [13]:
from langchain_core.documents import Document

def chunks_to_documents(chunks: list[str], metadata: list[dict] = None) -> list[Document]:
    """Convert chunks to Documents with optional metadata."""
    documents = []
    for i, chunk in enumerate(chunks):
        doc_metadata = metadata[i] if metadata else {"index": i}
        documents.append(Document(page_content=chunk, metadata=doc_metadata))
    return documents

In [14]:
from uuid import uuid4

_ = vectorstore.add_documents(
    documents=documents,
    ids=[str(uuid4()) for _ in documents]
)


## RETRIEVAL

In [26]:
from langchain_community.document_compressors import FlashrankRerank
from langchain_classic.retrievers.contextual_compression import ContextualCompressionRetriever

# Set up once
reranker = FlashrankRerank()

base_retriever = vectorstore.as_retriever(
    search_kwargs={
        "k": 50,
        "hybrid_fusion": models.FusionQuery(fusion=models.Fusion.RRF),
    }
)

# Wrap them together
retriever = ContextualCompressionRetriever(
    base_compressor=reranker,
    base_retriever=base_retriever
)

# Now invoke() handles both stages automatically
def hybrid_search(query, top_k=10):
    results = retriever.invoke(query)
    return results[:top_k]

In [16]:
docs = hybrid_search("quelles sont les conditions nécessaires pour qu'un employeur puisse légalement licencier un salarié ?")
docs

INFO:httpx:HTTP Request: POST http://localhost:6333/collections/labor_law/points/query "HTTP/1.1 200 OK"


[Document(metadata={'id': 12, 'relevance_score': np.float32(0.9996047), 'pageno': 9, 'source': 'sample/Le-code-du-travail-ivoirien-2023.pdf', '_id': '940e5bdb-a7a9-4f92-9180-1eb5bf52675a', '_collection_name': 'labor_law'}, page_content="1\n0 \n \nArt. 5 \nAucun salarié, aucune personne en formation ou en stage ne peut être sanctionné ni \nlicencié pour avoir refusé de subir les agissements de harcèlement moral ou sexuel \nd'un employeur, de son représentant o u de toute personne qui, abusant de l'autorité \nque lui confèrent ses fonctions, a donné des ordres, proféré des menaces, imposé \ndes contraintes ou exercé des pressions de toutes natures sur ce salarié. Aucun salarié, aucune personne en formation ou en stag e ne peut être \nsanctionné ni licencié pour avoir témoigné des agissements définis à l'alinéa \nprécédent ou pour les avoir relatés. Nul ne peut prendre en considération le fait que la personne intéressée a refusé \nde subir les agissements de harcèlement ou q u'une personn

## CONTEXT MANAGEMENT

### OLLAMA

In [18]:
from langchain_ollama import ChatOllama

ollama_llm = ChatOllama(
    model="llama3.2",
    temperature=0.1,
)

### HYBRID RETRIEVAL

In [19]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.prompts import PromptTemplate

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

def custom_retriever(query: str):
    # You can adjust top_k here if needed
    return hybrid_search(query, top_k=20)

# Format manually as a string prompt
template = """
**Role**
    - you are a helpful assistant and your role is to to answer questions from user.
**Objective** 
    - you'll be given documents to use as source for your answer. 
    - be as faithfull as possible to them.
**Expected solution**
    - a clear and well formated answer with three parts: answer, explication and source.
    
question : {question}
context : {context}
reponse :
"""

prompt = PromptTemplate(
    input_variables=["question", "context"],
    template=template,
)

qa_chain = (
    {
        "context": RunnableLambda(custom_retriever) | format_docs,
        "question": RunnablePassthrough(),
    }
    | prompt
    | ollama_llm
    | StrOutputParser()
)

In [20]:
response = qa_chain.invoke("quelles sont les conditions nécessaires pour qu'un employeur puisse légalement licencier un salarié ?")

INFO:httpx:HTTP Request: POST http://localhost:6333/collections/labor_law/points/query "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


In [21]:
from IPython.display import Markdown

Markdown(response)

**Réponse**

L'employeur peut légalement licencier un salarié sous certaines conditions. Selon l'article 17.4 du Code du Travail, le motif du licenciement peut tenir à la personne du salarié, qu'il s'agisse de son état de santé, de son aptitude à tenir l'emploi, de son insuffisance professionnelle ou de sa conduite fautive.

**Explication**

L'article 17.4 précise que le licenciement peut être motivé par des raisons personnelles, telles que les problèmes de santé ou les difficultés professionnelles. Cependant, il est important de noter que l'employeur doit respecter certaines formalités pour justifier le licenciement, notamment la notification écrite au salarié et à l'inspecteur du travail.

**Source**

* Article 17.4 du Code du Travail : "Le motif du licenciement peut tenir à la personne du salarié, qu'il s'agisse de son état de santé, de son aptitude à tenir l'emploi, de son insuffisance professionnelle ou de sa conduite fautive."
* Article 39 du Code du Travail : "En cas de licenciement par l'employeur, le travailleur ayant accompli dans l'entreprise une durée de service au moins égale à la période de référence ouvrant droit de jouissance au congé tel que fixée par la réglementation en vigueur, a droit à une indemnité de licenciement distincte de l'indemnité compensatrice de préavis."

### HYBRID RETRIEVAL WITH CACHING

In [33]:
# docs[0].metadata['source']

In [34]:
# docs[0].page_content

In [35]:
# {
#     "source": docs[0].metadata['source'],
#     "content" : docs[0].page_content
# }

In [36]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_huggingface import HuggingFaceEmbeddings
from collections import OrderedDict
import numpy as np
import json

import time

# Initialize cache
class LimitedSemanticCache:
    def __init__(self, max_size=100, similarity_threshold=0.80):
        self.cache = OrderedDict()
        self.max_size = max_size
        self.similarity_threshold = similarity_threshold
        self.embeddings = embedding_model
        self.hits = 0
        self.misses = 0
    
    def get(self, query: str):
        """Get cached result if similar query exists."""
        query_embedding = self.embeddings.embed_query(query)
        
        for cached_query, cached_data in self.cache.items():
            similarity = np.dot(query_embedding, cached_data["embedding"]) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(cached_data["embedding"]) + 1e-10
            )
            if similarity >= self.similarity_threshold:
                self.hits += 1
                print(f"✓ Cache HIT ({self.hits} hits, {self.misses} misses)")
                return cached_data["results"]
        
        self.misses += 1
        print(f"✗ Cache MISS ({self.hits} hits, {self.misses} misses)")
        return None
    
    def put(self, query: str, results, embedding):
        """Add to cache, evicting oldest entry if full."""
        if len(self.cache) >= self.max_size:
            removed = self.cache.popitem(last=False)
            print(f"  Cache evicted oldest query: '{removed[0][:30]}...'")
        
        self.cache[query] = {"embedding": embedding, "results": results}
        print(f"  Cache size: {len(self.cache)}/{self.max_size}")

# Initialize cache instance
cache = LimitedSemanticCache(max_size=100, similarity_threshold=0.80)


def format_docs(docs):
    formatted = []
    for d in docs:
        obj = {
            "source": d.metadata.get("source"),
            "content": d.page_content
        }
        formatted.append(json.dumps(obj, ensure_ascii=False, indent=2))
    return "\n\n".join(formatted)

def cached_retriever(query: str):
    """Retriever with semantic caching."""
    # Check cache first
    cached_results = cache.get(query)
    if cached_results:
        return cached_results
    
    # Cache miss: retrieve and store
    results = hybrid_search(query, top_k=20)
    query_embedding = cache.embeddings.embed_query(query)
    cache.put(query, results, query_embedding)
    
    return results

template = """
**Role**
    - you are a helpful assistant and your role is to to answer questions from user.
**Objective** 
    - you'll be given documents to use as source for your answer. 
    - be as faithfull as possible to them.
**Expected solution**
    - a clear and well formated answer with three parts: answer, explication and source.
    
question : {question}
context : {context}
reponse :
"""

prompt = PromptTemplate(
    input_variables=["question", "context"],
    template=template,
)

# Build the chain with cached retriever
qa_chain = (
    {
        "context": RunnableLambda(cached_retriever) | RunnableLambda(format_docs),
        "question": RunnablePassthrough(),
    }
    | prompt
    | ollama_llm
    | StrOutputParser()
)


=== First question ===
✗ Cache MISS (0 hits, 1 misses)


INFO:httpx:HTTP Request: POST http://localhost:6333/collections/labor_law/points/query "HTTP/1.1 200 OK"


  Cache size: 1/100


INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


execution time : 25.622926399999415
**Réponse**

**Answer**
Les différents types de contrat de travail sont :

*   Contrat à durée indéterminée (CDI) : il est précédé d'un engagement à l'essai ou peut comporter une clause d'élimination d'une période d'essai avant l'engagement définitif.
*   Contrat à temps partiel : il est précédé d'un engagement à l'essai ou peut comporter une clause d'élimination d'une période d'essai avant l'engagement définitif.
*   Contrat temporaire : il est précédé d'un engagement à l'essai ou peut comporter une clause d'élimination d'une période d'essai avant l'engagement définitif.

**Explication**
Selon l'article 2 du code du travail ivoirien de 2023, le contrat de travail doit comporter les mentions suivantes : la date et le lieu d'établissement du contrat ; les nom, prénoms, profession et domicile de l'employeur ; les nom, prénoms, sexe, date et lieu de naissance, la filiation, le domicile et la nationalité du travailleur, son métier ou sa profession; la na

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


execution time : 4.045568899993668
**Réponse**

**Answer**
Les différents types de contrat de travail en Côte d'Ivoire sont :

*   Contrat de travail précédé d'un engagement à l'essai
*   Contrat de travail avec une clause d'élimination d'une période d'essai
*   Contrat de travail pour les jeunes travailleurs (personnes de moins de 18 ans mais ayant atteint 14 ans)

**Explication**
Selon l'article 2 du Code du Travail Ivoirien, le contrat de travail doit comporter certaines mentions, notamment la date et le lieu d'établissement, les informations sur l'employeur et le travailleur, ainsi que la nature et la durée du contrat. L'article 10 précise que les jeunes travailleurs sont des personnes de moins de 18 ans mais ayant atteint 14 ans.

L'article 11 définit la traite d'enfants comme tout acte de recrutement, de transport, de transfert, d'hébergement ou d'accueil d'enfants à l'intérieur ou à l'extérieur d'un pays, aux fins d'exploitation.

**Source**
*   Article 2 du Code du Travail Ivoi

INFO:httpx:HTTP Request: POST http://localhost:6333/collections/labor_law/points/query "HTTP/1.1 200 OK"


  Cache size: 2/100


INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


execution time : 8.707784400001401
**Réponse**

**Answer**
La durée maximale d'un stage est de 12 mois, renouvellements compris.

**Explication**
Selon l'article 13.14 du Code du travail ivoirien de 2023, le contrat de stage de qualification ou d'expérience professionnelle ne peut excéder une durée de douze mois, renouvellements compris. Cela signifie que les entreprises sont tenues à donner au stagiaire une formation pratique pour acquérir une qualification ou une expérience professionnelle pendant cette période.

**Source**
Le Code du travail ivoirien de 2023, article 13.14 et article 14.5.

=== Cache Statistics ===
Total hits: 1
Total misses: 2
Hit rate: 33.3%


In [None]:
# Test it
print("=== First question ===")
start_time = time.perf_counter()
result1 = qa_chain.invoke("quels sont les diffents types de contrat de travail ?")
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"execution time : {execution_time}")
print(result1)

print("\n=== Similar question (should hit cache) ===")
start_time = time.perf_counter()
result2 = qa_chain.invoke("les differents types de contrat de travail")
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"execution time : {execution_time}")
print(result2)

print("\n=== Different question (should miss cache) ===")
start_time = time.perf_counter()
result3 = qa_chain.invoke("quelle est la durée maximale d'un stage ?")
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"execution time : {execution_time}")
print(result3)

# Check stats
print(f"\n=== Cache Statistics ===")
print(f"Total hits: {cache.hits}")
print(f"Total misses: {cache.misses}")
print(f"Hit rate: {cache.hits / (cache.hits + cache.misses) * 100:.1f}%")

### Memory