In [1]:
import re
import sys
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
import traceback
import time

from resources import db, cross_encoder_ustawa as cross_encoder

K_SIM: int = 15
K_FINAL: int = 5
RERANK_THRESHOLD: float = 0.2

REF_RE_EXT = re.compile(
    r"(?:art\.?\s*(?P<art>[0-9]+[a-z]?))"
    r"(?:\s*(?:ust(?:\.|ęp)?|ustep)\s*(?P<ust>[0-9]+[a-z]?))?"
    r"(?:\s*(?:pkt\.?)\s*(?P<pkt>[0-9]+[a-z]?))?"
    r"(?:\s*(?:lit\.?)\s*(?P<lit>[a-z]))?",
    re.IGNORECASE
)

def parse_ref_ext(query: str) -> Optional[Dict[str, str]]:
    m = REF_RE_EXT.search(query or "")
    if not m:
        return None
    ref: Dict[str, str] = {}
    if m.group("art"): ref["article"]   = m.group("art").lower()
    if m.group("ust"): ref["paragraph"] = m.group("ust").lower()
    if m.group("pkt"): ref["punkt"]     = m.group("pkt").lower()
    if m.group("lit"): ref["litera"]    = m.group("lit").lower()
    return ref or None

# ------------ diagnostyka / utilsy ------------
def _doc_pid(md: Dict[str, Any]) -> str:
    if "id" in md and md["id"]:
        return str(md["id"])
    rozdz = md.get("rozdzial") or md.get("chapter")
    art   = md.get("artykul")  or md.get("article")
    ust   = md.get("ust")      or md.get("paragraph")
    return f"ch{rozdz}-art{art}-ust{ust}"

def _peek_docs(docs: List[Any], n: int = 8) -> None:
    for i, d in enumerate(docs[:n], 1):
        md = dict(getattr(d, "metadata", {}) or {})
        pid = _doc_pid(md)
        art = md.get("artykul")  or md.get("article")
        ust = md.get("ust")      or md.get("paragraph")
        print(f"   {i:>2}. [{pid}]  art={art} ust={ust} | txt_frag='{(getattr(d,'page_content','') or '').strip()[:80].replace(chr(10),' ')}'")

def _dup_stats(docs: List[Any]) -> Tuple[int,int]:
    seen, dup = set(), 0
    for d in docs:
        md = dict(getattr(d, "metadata", {}) or {})
        pid = _doc_pid(md)
        if pid in seen: dup += 1
        else: seen.add(pid)
    return len(seen), dup

def _sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-x))

def print_docs(docs: List[Any]) -> None:
    if not docs:
        print("\n[WYNIK] Brak trafień po rerankingu.")
        return
    print("\n[WYNIK] Finalne ustępy:")
    for i, d in enumerate(docs, 1):
        md = dict(getattr(d, "metadata", {}) or {})
        pid = _doc_pid(md)
        sc  = md.get("rerank_score")
        art = md.get("artykul")  or md.get("article")
        ust = md.get("ust")      or md.get("paragraph")
        roz = md.get("rozdzial") or md.get("chapter")
        print(f"\n{i}. [{pid}] (score={sc:.4f})  Rozdz.{roz}  Art.{art}  Ust.{ust}")
        print("-" * 80)
        print((getattr(d, "page_content", "") or "").strip())

# ------------ GŁÓWNA FUNKCJA RETRIEVAL z LOGAMI ------------
import numpy as np

def _ce_to_prob(arr: np.ndarray) -> np.ndarray:
    """
    Ujednolicenie skorów z CrossEncodera:
    - 1D regresja/logit  -> sigmoid
    - 2D (neg,pos) logity -> softmax[:,1]
    - jeżeli wartości już w [0,1], zwracamy bez zmian
    """
    arr = np.asarray(arr, dtype=float)
    if arr.ndim == 1:
        # jeżeli w [0,1] → zostaw
        if np.nanmin(arr) >= 0.0 and np.nanmax(arr) <= 1.0:
            return arr
        # inaczej traktuj jako logity (regresja)
        return 1.0 / (1.0 + np.exp(-arr))
    elif arr.ndim == 2 and arr.shape[1] >= 2:
        # softmax po klasach i bierzemy prawdopodobieństwo "relevant"
        x = arr - arr.max(axis=1, keepdims=True)
        ex = np.exp(x)
        sm = ex / np.clip(ex.sum(axis=1, keepdims=True), 1e-9, None)
        return sm[:, -1]
    else:
        # nieznany format – awaryjnie znormalizuj min-maxem w batchu
        mn, mx = float(np.nanmin(arr)), float(np.nanmax(arr))
        if mx - mn < 1e-9:
            return np.zeros_like(arr)
        return (arr - mn) / (mx - mn)

def retrieve_final_paragraphs(query: str,
                              k_sim: int = K_SIM,
                              k_final: int = K_FINAL,
                              rerank_threshold: Optional[float] = RERANK_THRESHOLD) -> List[Any]:
    ref = parse_ref_ext(query)
    chroma_filter: Optional[Dict[str, str]] = None
    if ref:
        f: Dict[str, str] = {}
        if "article" in ref:   f["article"]   = ref["article"]
        if "paragraph" in ref: f["paragraph"] = ref["paragraph"]
        chroma_filter = f or None

    # 1) similarity
    docs: List[Any] = db.similarity_search(query, k=k_sim, filter=chroma_filter)
    print(f"[SIM] k={k_sim} filter={chroma_filter} → {len(docs)} kandydatów")
    if not docs:
        print("[SIM] PUSTO → kolekcja pusta / filtr nie pasuje / inny embedder?")
        return []

    # 2) CE scoring
    pairs = [(query, getattr(d, "page_content", "")) for d in docs]
    raw = cross_encoder.predict(pairs, batch_size=32)
    raw = np.asarray(raw, dtype=float)
    print(f"[CE] raw shape={raw.shape} min={np.nanmin(raw):.4f} max={np.nanmax(raw):.4f} mean={np.nanmean(raw):.4f}")

    probs = _ce_to_prob(raw)
    print(f"[CE] prob min={np.nanmin(probs):.4f} max={np.nanmax(probs):.4f} mean={np.nanmean(probs):.4f}")

    # 3) łączymy i progujemy już po PROB
    scored = [(d, float(p)) for d, p in zip(docs, probs)]

    if rerank_threshold is not None:
        before = len(scored)
        scored = [(d, s) for d, s in scored if s >= rerank_threshold]
        print(f"[THR] >= {rerank_threshold:.3f} → {len(scored)}/{before} kandydatów przechodzi")
        if not scored:
            print("[THR] Wszystko odpadło na progu – obniż próg (np. 0.20–0.40) albo ustaw None na test.")
            return []

    # 4) sort po PROB i top-k
    scored.sort(key=lambda x: x[1], reverse=True)
    picked = scored[:k_final]
    print(f"[PICK] top-{k_final}:")
    for i, (d, s) in enumerate(picked, 1):
        md = dict(getattr(d, "metadata", {}) or {})
        pid = _doc_pid(md)
        art = md.get("artykul") or md.get("article")
        ust = md.get("ust") or md.get("paragraph")
        print(f"   #{i}: [{pid}] prob={s:.4f} | art={art} ust={ust}")

    # 5) zapisujemy prob jako rerank_score
    out: List[Any] = []
    for d, s in picked:
        md = dict(getattr(d, "metadata", {}) or {})
        md["rerank_score"] = s
        d.metadata = md
        out.append(d)
    return out


# ------------ CLI / REPL ------------
def main() -> None:
    print("RAG probe – wpisz pytanie (ENTER, 'exit' aby zakończyć)")
    while True:
        try:
            q = input("\nTy: ").strip()
        except EOFError:
            break
        if not q:
            continue
        if q.lower() in {"exit", "quit", "q"}:
            break
        docs = retrieve_final_paragraphs(q)
        print_docs(docs)

if __name__ == "__main__":
    main()




Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

  inverted_mask = torch.tensor(1.0, dtype=dtype) - expanded_mask


ONNX Cross Encoder załadowany na CPU!


  embedder_u = HuggingFaceBgeEmbeddings(


[CHROMA] Liczba ustępów: 444
RAG probe – wpisz pytanie (ENTER, 'exit' aby zakończyć)
[SIM] k=15 filter=None → 15 kandydatów
[CE] raw shape=(15,) min=-0.5143 max=0.4747 mean=0.0435
[CE] prob min=0.3742 max=0.6165 mean=0.5111
[THR] >= 0.200 → 15/15 kandydatów przechodzi
[PICK] top-5:
   #1: [ch2-art7-ust4] prob=0.6165 | art=7 ust=4
   #2: [ch2-art21-ust5] prob=0.5816 | art=21 ust=5
   #3: [ch2-art21-ust1] prob=0.5572 | art=21 ust=1
   #4: [ch2-art21-ust6] prob=0.5544 | art=21 ust=6
   #5: [ch2-art33-ust2c] prob=0.5474 | art=33 ust=2c

[WYNIK] Finalne ustępy:

1. [ch2-art7-ust4] (score=0.6165)  Rozdz.2  Art.7  Ust.4
--------------------------------------------------------------------------------
Przepisów ust. 2 i 3 nie stosuje się do emerytów i rencistów, którzy mają orzeczoną niezdolność do samodzielnej egzystencji.

2. [ch2-art21-ust5] (score=0.5816)  Rozdz.2  Art.21  Ust.5
--------------------------------------------------------------------------------
Za całkowicie niezdolnego do pra